In this notebook, I'm going to build a model to predict the price of a diamond based on the available features, using the Apache Spark ML pipeline.

Information about the dataset:

http://ggplot2.tidyverse.org/reference/diamonds.html

In [2]:
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema", "true").load(dataPath)

In [3]:
diamonds.printSchema()

In [4]:
diamonds.show()

In [5]:
%fs ls /databricks-datasets/Rdatasets/data-001/csv/ggplot2/

path,name,size
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv,diamonds.csv,3192560
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/economics.csv,economics.csv,20731
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/midwest.csv,midwest.csv,100539
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/movies.csv,movies.csv,6000709
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/mpg.csv,mpg.csv,17345
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/msleep.csv,msleep.csv,7182
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/presidential.csv,presidential.csv,512
dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/seals.csv,seals.csv,64016


In [6]:
#drop the id column, and get rid of nas
df_no_id = diamonds.drop('_c0')
df_no_na = df_no_id.dropna()

In [7]:
df = df_no_na.select('price', 'carat', 'cut', 'color', 'clarity', 
  'depth', 'table', 'x', 'y', 'z')
df.show()

In [8]:
df = df.withColumnRenamed('price', 'label')

In [9]:
df.show()

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

cutIndexer = StringIndexer(inputCol='cut', outputCol='cutIndex')
colorIndexer = StringIndexer(inputCol='color', outputCol='colorIndex')
clarityIndexer = StringIndexer(inputCol='clarity', outputCol='clarityIndex')

df = cutIndexer.fit(df).transform(df)
df = colorIndexer.fit(df).transform(df)
df = clarityIndexer.fit(df).transform(df)

df.show()

In [11]:
from pyspark.ml.feature import OneHotEncoderEstimator

OHE = OneHotEncoderEstimator(inputCols=['cutIndex', 'colorIndex', 'clarityIndex'],outputCols=['cut_OHE', 'color_OHE', 'clarity_OHE'])

df = OHE.fit(df).transform(df)

In [12]:
df.show()

In [13]:
assembler = VectorAssembler(
  inputCols= ['carat', 'depth', 'table', 'x', 'y', 'z', 'cut_OHE', 'color_OHE', 'clarity_OHE'], outputCol=('features_assem'))

df = df.dropna()

In [14]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features_assem", outputCol="scaledFeatures")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaled_df = scalerModel.transform(df)
display(scaled_df)

label,carat,cut,color,clarity,depth,table,x,y,z,cutIndex,colorIndex,clarityIndex,cut_OHE,color_OHE,clarity_OHE,features_assem,scaledFeatures
326,0.23,Ideal,E,SI2,61.5,55.0,3.95,3.98,2.43,0.0,1.0,2.0,"List(0, 4, List(0), List(1.0))","List(0, 6, List(1), List(1.0))","List(0, 7, List(2), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 6, 11, 18), List(0.23, 61.5, 55.0, 3.95, 3.98, 2.43, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.006237006237006237, 0.5138888888888888, 0.23076923076923078, 0.3677839851024209, 0.06757215619694397, 0.07641509433962264, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0))"
326,0.21,Premium,E,SI1,59.8,61.0,3.89,3.84,2.31,1.0,1.0,0.0,"List(0, 4, List(1), List(1.0))","List(0, 6, List(1), List(1.0))","List(0, 7, List(0), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 7, 11, 16), List(0.21, 59.8, 61.0, 3.89, 3.84, 2.31, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.002079002079002075, 0.46666666666666656, 0.34615384615384615, 0.3621973929236499, 0.06519524617996604, 0.07264150943396226, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
327,0.23,Good,E,VS1,56.9,65.0,4.05,4.07,2.31,3.0,1.0,3.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(1), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 9, 11, 19), List(0.23, 56.9, 65.0, 4.05, 4.07, 2.31, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.006237006237006237, 0.38611111111111107, 0.4230769230769231, 0.37709497206703907, 0.06910016977928693, 0.07264150943396226, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0))"
334,0.29,Premium,I,VS2,62.4,58.0,4.2,4.23,2.63,1.0,5.0,1.0,"List(0, 4, List(1), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(1), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 7, 15, 17), List(0.29, 62.4, 58.0, 4.2, 4.23, 2.63, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.018711018711018705, 0.5388888888888889, 0.28846153846153844, 0.3910614525139665, 0.07181663837011885, 0.08270440251572327, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
335,0.31,Good,J,SI2,63.3,58.0,4.34,4.35,2.75,3.0,6.0,2.0,"List(0, 4, List(3), List(1.0))","List(0, 6, List(), List())","List(0, 7, List(2), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 9, 18), List(0.31, 63.3, 58.0, 4.34, 4.35, 2.75, 1.0, 1.0))","List(1, 23, List(), List(0.022869022869022867, 0.5638888888888888, 0.28846153846153844, 0.404096834264432, 0.07385398981324277, 0.08647798742138364, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0))"
336,0.24,Very Good,J,VVS2,62.8,57.0,3.94,3.96,2.48,2.0,6.0,4.0,"List(0, 4, List(2), List(1.0))","List(0, 6, List(), List())","List(0, 7, List(4), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 8, 20), List(0.24, 62.8, 57.0, 3.94, 3.96, 2.48, 1.0, 1.0))","List(1, 23, List(), List(0.008316008316008313, 0.5499999999999999, 0.2692307692307692, 0.36685288640595903, 0.06723259762308999, 0.0779874213836478, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0))"
336,0.24,Very Good,I,VVS1,62.3,57.0,3.95,3.98,2.47,2.0,5.0,5.0,"List(0, 4, List(2), List(1.0))","List(0, 6, List(5), List(1.0))","List(0, 7, List(5), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 8, 15, 21), List(0.24, 62.3, 57.0, 3.95, 3.98, 2.47, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.008316008316008313, 0.536111111111111, 0.2692307692307692, 0.3677839851024209, 0.06757215619694397, 0.07767295597484278, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0))"
337,0.26,Very Good,H,SI1,61.9,55.0,4.07,4.11,2.53,2.0,3.0,0.0,"List(0, 4, List(2), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 7, List(0), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 8, 13, 16), List(0.26, 61.9, 55.0, 4.07, 4.11, 2.53, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.012474012474012475, 0.5249999999999999, 0.23076923076923078, 0.3789571694599628, 0.06977928692699491, 0.07955974842767295, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
337,0.22,Fair,E,VS2,65.1,61.0,3.87,3.78,2.49,4.0,1.0,1.0,"List(0, 4, List(), List())","List(0, 6, List(1), List(1.0))","List(0, 7, List(1), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 11, 17), List(0.22, 65.1, 61.0, 3.87, 3.78, 2.49, 1.0, 1.0))","List(1, 23, List(), List(0.0041580041580041565, 0.6138888888888887, 0.34615384615384615, 0.36033519553072624, 0.06417657045840407, 0.07830188679245284, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
338,0.23,Very Good,H,VS1,59.4,61.0,4.0,4.05,2.39,2.0,3.0,3.0,"List(0, 4, List(2), List(1.0))","List(0, 6, List(3), List(1.0))","List(0, 7, List(3), List(1.0))","List(0, 23, List(0, 1, 2, 3, 4, 5, 8, 13, 19), List(0.23, 59.4, 61.0, 4.0, 4.05, 2.39, 1.0, 1.0, 1.0))","List(1, 23, List(), List(0.006237006237006237, 0.4555555555555555, 0.34615384615384615, 0.37243947858473, 0.06876061120543293, 0.07515723270440251, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0))"


In [15]:
training, test = scaled_df.randomSplit([0.7, 0.3])
training.cache()
test.cache()

In [16]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

dt = DecisionTreeRegressor(featuresCol = "scaledFeatures")

pipeline = Pipeline(stages= [dt])

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15, 20, 30]) \
    .addGrid(dt.maxBins, [10, 20, 30, 50]) \
    .build()

In [17]:
cv = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)
cvModel = cv.fit(training)
predictions = cvModel.transform(test)

In [18]:
evaluator = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "rmse")

rmse = evaluator.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "r2")

r2 = evaluator_r2.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "mae")

mae = evaluator_mae.evaluate(predictions)

evaluator_mse = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "mse")

mse = evaluator_mse.evaluate(predictions)


print("RMSE on test data = ", rmse)
print("R_squared on test data = ", r2)
print("Mean Absolute Error (MAE) on test data = ", mae)
print("Mean Squared Error (MSE) on test data = ", mse)

predictions.select("label", "prediction").show()

In [19]:
from pyspark.ml.regression import RandomForestRegressor

rf = (RandomForestRegressor()
          .setLabelCol('label')
          .setFeaturesCol('scaledFeatures'))
#stages = [indexers , encoders, assembler_1 , assembler, scaler, rf]
pipeline = Pipeline(stages=[rf])

paramGrid = (ParamGridBuilder()
            .addGrid(rf.maxDepth, [5, 10])
            .addGrid(rf.numTrees, [10, 20])
            .addGrid(rf.maxBins, [10, 20, 30, 50])
            .build())

In [20]:
cv = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=3)
cvModel = cv.fit(training)
predictions = cvModel.transform(test)

In [21]:
evaluator = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "rmse")

rmse = evaluator.evaluate(predictions)

evaluator_r2 = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "r2")

r2 = evaluator_r2.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "mae")

mae = evaluator_mae.evaluate(predictions)

evaluator_mse = RegressionEvaluator(labelCol = "label", predictionCol= "prediction", metricName = "mse")

mse = evaluator_mse.evaluate(predictions)


print("RMSE on test data = ", rmse)
print("R_squared on test data = ", r2)
print("Mean Absolute Error (MAE) on test data = ", mae)
print("Mean Squared Error (MSE) on test data = ", mse)

predictions.select("label", "prediction").show()