<a href="https://colab.research.google.com/github/mukamal/predicting-diamond-prices-gbm/blob/main/predicting_diamond_prices_gbm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Predicting diamond prices- GBM

# Diamonds Data

The diamonds.csv data set contains 10 columns:
- carat: Carat weight of the diamond
- cut: Describes cut quality of the diamond. Quality in increasing order Fair, Good, Very Good, Premium, Ideal
- color: Color of the diamond, with D being the best and J the worst
- clarity: How obvious inclusions are within the diamond:(in order from best to worst, FL = flawless, I3= level 3 inclusions) FL,IF, VVS1, etc.  See this web site for an exhaustive ranking of [clarity](https://4cs.gia.edu/en-us/diamond-clarity/?gclid=Cj0KCQjwnqH7BRDdARIsACTSAduMoc2KQbXkO94BxCfBNC5X8YyjAYcFpWThKQMW46cQj_3p0pZ0o84aAuagEALw_wcB).  The web site has a nice sliding scale you can drag to see the relationship between clarity grades.
- depth: depth % - The height of a diamond, measured from the culet to the table, divided by its average girdle diameter
- table: table% -  The width of the diamond's table expressed as a percentage of its average diameter
- price: The price of the diamond
- x: Length (mm)
- y: Width (mm)
- z: Height (mm)

In [None]:
%%bash
# Need to install pyspark
# if pyspark is already installed, will print a message indicating pyspark already isntalled
pip install pyspark

# Download the data files from github
# If the data file does not exist in the colab environment
if [[ ! -f ./quotes_by_char.csv ]]; then 
   # download the data file from github and save it in this colab environment instance
   wget https://raw.githubusercontent.com/mukamal/data/main/diamonds.csv  
fi

Collecting pyspark
  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
Collecting py4j==0.10.9
  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=f367796f5857e0bc9542795e41c40a8c1492beafb80c2f84ceb61667b34dd793
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


--2021-04-14 21:47:20--  https://raw.githubusercontent.com/wewilli1/ist718_data/master/diamonds.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3192560 (3.0M) [text/plain]
Saving to: ‘diamonds.csv’

     0K .......... .......... .......... .......... ..........  1% 7.15M 0s
    50K .......... .......... .......... .......... ..........  3% 20.3M 0s
   100K .......... .......... .......... .......... ..........  4% 8.83M 0s
   150K .......... .......... .......... .......... ..........  6% 33.3M 0s
   200K .......... .......... .......... .......... ..........  8% 12.0M 0s
   250K .......... .......... .......... .......... ..........  9% 39.4M 0s
   300K .......... .......... .......... .......... .......... 11% 54.9M 0s
   350K .......... .......... ...

Reading the diamonds.csv file into a spark data frame named `diamonds_df`.  And performing feature engineering as needed for training decision trees.

In [None]:


# import statements
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType
from pyspark.ml import feature, regression, evaluation, Pipeline



spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

diamonds_df = spark.read.format("csv").option("header", "true").load("diamonds.csv")

diamonds_df = diamonds_df.withColumn('carat', diamonds_df['carat'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('depth', diamonds_df['depth'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('table', diamonds_df['table'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('price', diamonds_df['price'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('x', diamonds_df['x'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('y', diamonds_df['y'].cast(DoubleType()))
diamonds_df = diamonds_df.withColumn('z', diamonds_df['z'].cast(DoubleType()))



cut_i = feature.StringIndexerModel.from_labels(['Fair', 'Good', 'Very Good', 'Premium', 'Ideal'],
                                    inputCol="cut",
                                    outputCol="cut_idx")

col_i = feature.StringIndexerModel.from_labels(['J', 'I', 'H','G','F', 'E', 'D'],
                                    inputCol="color",
                                    outputCol="color_idx")
clar_i = feature.StringIndexerModel.from_labels(['I1', 'SI2', 'SI1', 'VS2','VS1','VVS2','VVS1','IF'],
                                    inputCol="clarity",
                                    outputCol="clarity_idx")


feature_engineering_pipe=Pipeline(stages=[cut_i, col_i, clar_i])

diamonds_df_xformed = feature_engineering_pipe.fit(diamonds_df).transform(diamonds_df)

drop_cols = ['color', 'cut','clarity']
diamonds_df_xformed = diamonds_df_xformed.\
    drop(*drop_cols)


diamonds_df_xformed = diamonds_df_xformed.withColumnRenamed("color_idx", "color")\
                        .withColumnRenamed("cut_idx", "cut")\
                        .withColumnRenamed("clarity_idx", "clarity")




# drop NAs and nulls and remove index col
diamonds_df_xformed = diamonds_df_xformed.dropna().drop('_c0')


In [None]:
display(diamonds_df_xformed.toPandas().head())

Unnamed: 0,carat,depth,table,price,x,y,z,cut,color,clarity
0,0.23,61.5,55.0,326.0,3.95,3.98,2.43,4.0,5.0,1.0
1,0.21,59.8,61.0,326.0,3.89,3.84,2.31,3.0,5.0,2.0
2,0.23,56.9,65.0,327.0,4.05,4.07,2.31,1.0,5.0,4.0
3,0.29,62.4,58.0,334.0,4.2,4.23,2.63,3.0,1.0,3.0
4,0.31,63.3,58.0,335.0,4.34,4.35,2.75,1.0,0.0,1.0


The following will create a random forest regressor model, train the model using a grid search, and use the model for inference.  The goal is to see if we can improve upon the linear regression score from earlier project.



Creating and training random forest regressor model using a grid search in the cell below.

In [None]:
from pyspark.ml.feature import VectorAssembler

training_df, validation_df, testing_df = diamonds_df_xformed.randomSplit([0.6, 0.3, 0.1])
va = VectorAssembler().setInputCols(['carat','table', 'depth', 'x', 'y', 'z','cut',	'color',	'clarity']).setOutputCol('features')

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator



if enable_grid_search:
  rf = RandomForestRegressor().setLabelCol("price")

  rf_pipeline = Pipeline(stages=[va, rf]).fit(training_df)

  paramGrid = ParamGridBuilder() \
      .addGrid(rf.numTrees, [10,20,30,50]) \
      .addGrid(rf.maxDepth, [5,10,15,20]) \
      .build()



  crossval = CrossValidator(estimator=Pipeline(stages=[va, rf]),
                            estimatorParamMaps=paramGrid,
                            evaluator=RegressionEvaluator().setLabelCol("price"),
                            numFolds=3)

  # cvModel = crossval.fit(training_df)
  final_model_fitted = crossval.fit(training_df)
  # Get Model Summary Statistics
  final_model_fitted.bestModel.stages[1]
  final_model_fitted.avgMetrics

  pass


In [None]:
# print('numTrees - ', final_model_fitted.bestModel.stages[1].getNumTrees)
# print('maxDepth - ', final_model_fitted.bestModel.stages[1].getOrDefault('maxDepth'))
#numTrees -  30
#maxDepth -  15

Creating a pipeline named `best_pipe` that hard codes the tuning parameters from the best model found by the grid search above.

In [None]:


from pyspark.sql import functions as fn, Row


# Train a RandomForest model.


rf_best = RandomForestRegressor(numTrees=30, maxDepth=15).setLabelCol("price")

best_pipe  = Pipeline(stages=[va, rf_best]).fit(training_df)
mse = fn.mean((fn.col('price') - fn.col('prediction'))**2).alias('mse')

print("Best Model train mse:")
best_pipe.transform(training_df).select(mse).show()

print("Best Model test mse:")
best_pipe.transform(testing_df).select(mse).show()



Best Model train mse:
+-----------------+
|              mse|
+-----------------+
|173085.3883443526|
+-----------------+

Best Model test mse:
+------------------+
|               mse|
+------------------+
|390652.25492702663|
+------------------+



Using best_pipe pipeline.  

Creating a pandas data frame named `rf_feature_importance` which contains 2 columns: `feature`, and `importance`.  

Loading the feature column with the feature name and the importance column with the feature importance score as determined by the random forest model. 

Sorting the feature importances from high to low such that the most important feature is in the first row of the data frame.

In [None]:
import pandas as pd

rf_model = best_pipe.stages[-1]
rf_feature_importance=pd.DataFrame(list(zip(['carat','table', 'depth', 'x', 'y', 'z','cut',	'color',	'clarity'], rf_model.featureImportances.toArray())),
            columns = ['column', 'weight']).sort_values('weight')
#rf_model.featureImportances

In [None]:
display(rf_feature_importance)

Unnamed: 0,column,weight
6,cut,0.00458
1,table,0.005977
2,depth,0.006332
7,color,0.031585
8,clarity,0.0559
5,z,0.118419
3,x,0.186059
0,carat,0.242932
4,y,0.348216


In [None]:

#print(rf_model.trees[0].toDebugString)



Root Node is that of 'carat', since it was determined to be the most important feature. And the value that determines split is "0.995". 


Random Forest does random feature selection for each tree which it does so that correlation between the trees may be reduced.

##Comparing MSE scores

###MSE score from Linear Regression:


Best Model mse: **1500074**


Best Model mse:**1551313**




###MSE score from Random forest regression:


Best Model train mse:**163749**


Best Model mse:**340683**




The random forest model MSE score was better. Since MSE has been reduced by a factor of 10.




In [None]:
from pyspark.ml.regression import GBTRegressor

#enable_grid_search = True
if enable_grid_search:
  gbt = GBTRegressor(labelCol = 'price')
  paramGrid = ParamGridBuilder() \
      .addGrid(gbt.maxIter, [10,20,30]) \
      .addGrid(gbt.maxDepth, [5,10,15]) \
      .addGrid(gbt.stepSize, [.1,.05]) \
      .build()

  crossval = CrossValidator(estimator=Pipeline(stages=[va, gbt]),
                            estimatorParamMaps=paramGrid,
                            evaluator=RegressionEvaluator().setLabelCol("price"),
                            numFolds=3)

  # cvModel = crossval.fit(training_df)
  final_model_fitted = crossval.fit(training_df)
  # Get Model Summary Statistics
  final_model_fitted.bestModel.stages[1]
  final_model_fitted.avgMetrics
  pass
enable_grid_search = False

In [None]:
# print('maxIter - ', final_model_fitted.bestModel.stages[-1].getOrDefault('maxIter'))
# print('maxDepth - ', final_model_fitted.bestModel.stages[-1].getOrDefault('maxDepth'))
# print('stepSize - ', final_model_fitted.bestModel.stages[-1].getOrDefault('stepSize'))

# maxIter -  30
# maxDepth -  5
# stepSize -  0.1

In [None]:
gbt_best = GBTRegressor(maxIter=30, maxDepth=8,minInstancesPerNode=10,stepSize=.125, maxBins=40).setLabelCol("price")

best_pipe_2  = Pipeline(stages=[va, gbt_best]).fit(training_df)
mse = fn.mean((fn.col('price') - fn.col('prediction'))**2).alias('mse')

print("Best Model train mse:")
best_pipe_2.transform(training_df).select(mse).show()

print("Best Model test mse:")
best_pipe_2.transform(testing_df).select(mse).show()


Best Model train mse:
+------------------+
|               mse|
+------------------+
|233323.36696081207|
+------------------+

Best Model test mse:
+-----------------+
|              mse|
+-----------------+
|421725.1665759154|
+-----------------+



In [None]:
# Create compare_1_df
compare_1_df=pd.DataFrame([['Linear Regression',1551313],[ 'Random Forest',409917],[ 'GBT',452699]],columns = ['Model', 'MSE']).sort_values('MSE')

In [None]:
display(compare_1_df)

Unnamed: 0,Model,MSE
1,Random Forest,409917
2,GBT,452699
0,Linear Regression,1551313
