## Assignment 4 - Spark ML

## Learning Outcomes
In this assignment you will: 

-  Use ML piplenes
-  Improve a Random Forest model
-  Perform Hyperparameter tuning

** Question 1: ** (5 marks)

In our learning from this module, we have identified a fairly significant link by leveraging the ML pipeline, a more sophisticated model, and better hyperparameter tuning. However these results are still a bit disappointing. With that being said, we're working with very few features and we've likely made some assumptions that just aren't quite valid (like zip code shortening). Also, just because a rich zip code exists doesn't mean that the farmer's market would be held in that zip code too. In fact we might want to start looking at neighboring zip codes or doing some sort of distance measure to predict whether or not there exists a farmer's market in a certain mile radius from a wealthy zip code.

With that being said, we've got a lot of other potential features and plenty of other parameters to tune on our random forest so play around with the above pipeline and see if you can improve it further!  
    
You may use the same classifier we built in the notebook(  command cells 65 to 82)  in this module.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html

(https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html)

** Question 2 ** ( 7 marks)


Using the Apache Spark ML pipeline, build a model to predict the price of a diamond based on the available features.

Read from the following notebook for details about dataset.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html

(https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html)


Note:  Please submit the **published** notebook link in a word/pdf document.  Do not submit HTML, IPython notebook, or archive (DBC) formats.  


# Question 1

In [1]:
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder \
    .enableHiveSupport().getOrCreate()

df = spark.read.csv("joinedData.csv", sep=',', header=True, inferSchema=True)
df.toPandas().head()

Unnamed: 0,count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
0,,,463,8334,930,840,980,82700,460,825,4409
1,2.0,496.0,496,17856,3290,3250,4150,357071,1540,5704,10969
2,,,833,9996,14000,9490,19940,1497553,7747,12788,32485
3,1.0,1342.0,1342,40260,4800,3680,5450,475283,2919,6476,13496
4,1.0,1580.0,1580,9480,4600,3940,4650,516516,4243,10827,22441


In [2]:
from pyspark.sql.functions import *

# use mean value to replace the missing value except zipcode column.

for name in df.columns:
    df = df.withColumn(name, col(name).cast("double"))
    if name != "zipcode":
        df = df.na.fill(df.na.drop().agg(round(avg(name))).first()[0], [name])

df.toPandas().head()

Unnamed: 0,count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
0,2.0,4810.0,463.0,8334.0,930.0,840.0,980.0,82700.0,460.0,825.0,4409.0
1,2.0,496.0,496.0,17856.0,3290.0,3250.0,4150.0,357071.0,1540.0,5704.0,10969.0
2,2.0,4810.0,833.0,9996.0,14000.0,9490.0,19940.0,1497553.0,7747.0,12788.0,32485.0
3,1.0,1342.0,1342.0,40260.0,4800.0,3680.0,5450.0,475283.0,2919.0,6476.0,13496.0
4,1.0,1580.0,1580.0,9480.0,4600.0,3940.0,4650.0,516516.0,4243.0,10827.0,22441.0


In [3]:
nonFeatureCols = ["zip", "zipcode", "count"]
featureCols = [item for item in df.columns if item not in nonFeatureCols]

from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

clean_df = assembler.transform(df)
clean_df.toPandas().head()

Unnamed: 0,count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income),features
0,2.0,4810.0,463.0,8334.0,930.0,840.0,980.0,82700.0,460.0,825.0,4409.0,"[8334.0, 930.0, 840.0, 980.0, 82700.0, 460.0, ..."
1,2.0,496.0,496.0,17856.0,3290.0,3250.0,4150.0,357071.0,1540.0,5704.0,10969.0,"[17856.0, 3290.0, 3250.0, 4150.0, 357071.0, 15..."
2,2.0,4810.0,833.0,9996.0,14000.0,9490.0,19940.0,1497553.0,7747.0,12788.0,32485.0,"[9996.0, 14000.0, 9490.0, 19940.0, 1497553.0, ..."
3,1.0,1342.0,1342.0,40260.0,4800.0,3680.0,5450.0,475283.0,2919.0,6476.0,13496.0,"[40260.0, 4800.0, 3680.0, 5450.0, 475283.0, 29..."
4,1.0,1580.0,1580.0,9480.0,4600.0,3940.0,4650.0,516516.0,4243.0,10827.0,22441.0,"[9480.0, 4600.0, 3940.0, 4650.0, 516516.0, 424..."


In [4]:
trainSet, testSet = clean_df.randomSplit([0.7, 0.3])

trainSet.cache()
testSet.cache()

DataFrame[count: double, zip: double, zipcode: double, sum(zipcode): double, sum(single_returns): double, sum(joint_returns): double, sum(numdep): double, sum(total_income_amount): double, sum(taxable_interest_amount): double, sum(net_capital_gains): double, sum(biz_net_income): double, features: vector]

In [5]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

improved_rfModel = (RandomForestRegressor()
  .setLabelCol("count")
  .setFeaturesCol("features"))

# improve the random forest regressor with the bigger parameters space.

paramGrid = (ParamGridBuilder()
  .addGrid(improved_rfModel.maxDepth, [5, 10])
  .addGrid(improved_rfModel.numTrees, [20, 60])
  .addGrid(improved_rfModel.featureSubsetStrategy, ["auto", "all", "sqrt", "log2"])
  .addGrid(improved_rfModel.maxBins, [16, 32, 64])
  .build())

stages = [improved_rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("count")))

pipelineFitted = cv.fit(trainSet)

In [6]:
holdout = (pipelineFitted.bestModel
  .transform(testSet)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal"""))
  
holdout.toPandas().head()

Unnamed: 0,raw_prediction,prediction,count,equal
0,1.47761,1.0,1.0,1
1,1.88917,2.0,1.0,0
2,1.823638,2.0,1.0,0
3,1.97963,2.0,1.0,0
4,1.768517,2.0,1.0,0


In [7]:
from pyspark.mllib.evaluation import RegressionMetrics

rm = RegressionMetrics(holdout.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")

MSE:  1.7727272727272732
MAE:  0.7337662337662337
RMSE Squared:  1.331438046897892
R Squared:  0.07567496262421947
Explained Variance:  0.24827120931017038 



In [8]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel (uid=RandomForestRegressor_d847c21c377b) with 60 trees


{Param(parent='RandomForestRegressor_d847c21c377b', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='RandomForestRegressor_d847c21c377b', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='RandomForestRegressor_d847c21c377b', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'auto',
 Param(parent='RandomForestRegressor_d847c21c377b', name='featuresCol', doc='features column name'): 'features',
 Param(parent='RandomForestRegressor_d847c21c377b', name=

In [9]:
holdout.selectExpr("sum(equal)/sum(1)").toPandas()

Unnamed: 0,(sum(equal) / sum(1))
0,0.496753


# Question 2

In [10]:
from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder \
    .enableHiveSupport().getOrCreate()

df = spark.read.csv("diamonds.csv", sep=',', header=True, inferSchema=True)
df.toPandas().head()

Unnamed: 0,_c0,carat,cut,color,clarity,depth,table,price,x,y,z
0,1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
4,5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75


In [11]:
df.select('cut').distinct().show(), df.select('color').distinct().show(), df.select('clarity').distinct().show()

+---------+
|      cut|
+---------+
|  Premium|
|    Ideal|
|     Good|
|     Fair|
|Very Good|
+---------+

+-----+
|color|
+-----+
|    F|
|    E|
|    D|
|    J|
|    G|
|    I|
|    H|
+-----+

+-------+
|clarity|
+-------+
|   VVS2|
|    SI1|
|     IF|
|     I1|
|   VVS1|
|    VS2|
|    SI2|
|    VS1|
+-------+



(None, None, None)

In [12]:
# transform string value to numerical value.

encode_col = ['cut', 'clarity', 'color']
for col in encode_col:
    for i, val in enumerate(list(df.select(col).distinct().toPandas()[col])):
        df = df.withColumn(col, when(df[col] != val, df[col]).otherwise(float(i)))

df.toPandas().head()

Unnamed: 0,_c0,carat,cut,color,clarity,depth,table,price,x,y,z
0,1,0.23,1.0,1.0,6.0,61.5,55.0,326,3.95,3.98,2.43
1,2,0.21,0.0,1.0,1.0,59.8,61.0,326,3.89,3.84,2.31
2,3,0.23,2.0,1.0,7.0,56.9,65.0,327,4.05,4.07,2.31
3,4,0.29,0.0,5.0,5.0,62.4,58.0,334,4.2,4.23,2.63
4,5,0.31,2.0,3.0,6.0,63.3,58.0,335,4.34,4.35,2.75


In [13]:
# drop the first column.

df = df.drop('_c0')
df.toPandas().head()

Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z
0,0.23,1.0,1.0,6.0,61.5,55.0,326,3.95,3.98,2.43
1,0.21,0.0,1.0,1.0,59.8,61.0,326,3.89,3.84,2.31
2,0.23,2.0,1.0,7.0,56.9,65.0,327,4.05,4.07,2.31
3,0.29,0.0,5.0,5.0,62.4,58.0,334,4.2,4.23,2.63
4,0.31,2.0,3.0,6.0,63.3,58.0,335,4.34,4.35,2.75


In [14]:
# vectorize the features

from pyspark.sql.functions import *

for name in df.columns:
    df = df.withColumn(name, col(name).cast("double"))

featureCols = [item for item in df.columns if item != "price"]

from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

df = assembler.transform(df)
df.toPandas().head()

Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z,features
0,0.23,1.0,1.0,6.0,61.5,55.0,326.0,3.95,3.98,2.43,"[0.23, 1.0, 1.0, 6.0, 61.5, 55.0, 3.95, 3.98, ..."
1,0.21,0.0,1.0,1.0,59.8,61.0,326.0,3.89,3.84,2.31,"[0.21, 0.0, 1.0, 1.0, 59.8, 61.0, 3.89, 3.84, ..."
2,0.23,2.0,1.0,7.0,56.9,65.0,327.0,4.05,4.07,2.31,"[0.23, 2.0, 1.0, 7.0, 56.9, 65.0, 4.05, 4.07, ..."
3,0.29,0.0,5.0,5.0,62.4,58.0,334.0,4.2,4.23,2.63,"[0.29, 0.0, 5.0, 5.0, 62.4, 58.0, 4.2, 4.23, 2..."
4,0.31,2.0,3.0,6.0,63.3,58.0,335.0,4.34,4.35,2.75,"[0.31, 2.0, 3.0, 6.0, 63.3, 58.0, 4.34, 4.35, ..."


In [15]:
# normalize features data.

from pyspark.ml.feature import MinMaxScaler

mmScaler = MinMaxScaler(inputCol="features", outputCol="norm_features")
norm_df = mmScaler.fit(df.select(df["features"]))
norm_df = norm_df.transform(df.select(df["features"]))
new_df = df.join(norm_df, "features", "right_outer")
new_df.toPandas().head()

Unnamed: 0,features,carat,cut,color,clarity,depth,table,price,x,y,z,norm_features
0,"[0.23, 1.0, 1.0, 6.0, 61.5, 55.0, 3.95, 3.98, ...",0.23,1.0,1.0,6.0,61.5,55.0,326.0,3.95,3.98,2.43,"[0.02803738317757009, 0.25, 0.1666666666666666..."
1,"[0.21, 0.0, 1.0, 1.0, 59.8, 61.0, 3.89, 3.84, ...",0.21,0.0,1.0,1.0,59.8,61.0,326.0,3.89,3.84,2.31,"[0.009345794392523346, 0.0, 0.1666666666666666..."
2,"[0.23, 2.0, 1.0, 7.0, 56.9, 65.0, 4.05, 4.07, ...",0.23,2.0,1.0,7.0,56.9,65.0,327.0,4.05,4.07,2.31,"[0.02803738317757009, 0.5, 0.16666666666666666..."
3,"[0.29, 0.0, 5.0, 5.0, 62.4, 58.0, 4.2, 4.23, 2...",0.29,0.0,5.0,5.0,62.4,58.0,334.0,4.2,4.23,2.63,"[0.08411214953271025, 0.0, 0.8333333333333334,..."
4,"[0.31, 2.0, 3.0, 6.0, 63.3, 58.0, 4.34, 4.35, ...",0.31,2.0,3.0,6.0,63.3,58.0,335.0,4.34,4.35,2.75,"[0.10280373831775699, 0.5, 0.5, 0.857142857142..."


In [16]:
# split dataset to be 70 : 30

trainSet, testSet = new_df.randomSplit([0.7, 0.3])

trainSet.cache()
testSet.cache()

DataFrame[features: vector, carat: double, cut: double, color: double, clarity: double, depth: double, table: double, price: double, x: double, y: double, z: double, norm_features: vector]

In [17]:
# build a Random Forest regresion model with Pipeline to predict the price.

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

improved_rfModel = (RandomForestRegressor()
  .setLabelCol("price")
  .setFeaturesCol("norm_features"))

paramGrid = (ParamGridBuilder()
  .addGrid(improved_rfModel.maxDepth, [5, 10])
  .addGrid(improved_rfModel.numTrees, [20, 60])
  .addGrid(improved_rfModel.featureSubsetStrategy, ["auto", "all", "sqrt", "log2"])
  .addGrid(improved_rfModel.maxBins, [16, 32, 64])
  .build())

stages = [improved_rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("price")))

pipelineFitted = cv.fit(trainSet)

In [18]:
holdout = (pipelineFitted.bestModel
  .transform(testSet)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "price"))

holdout.toPandas().head()

Unnamed: 0,raw_prediction,prediction,price
0,388.3,388.0,404.0
1,368.4,368.0,337.0
2,380.9,381.0,402.0
3,383.05,383.0,402.0
4,399.0,399.0,357.0


In [19]:
from pyspark.mllib.evaluation import RegressionMetrics

rm = RegressionMetrics(holdout.select("prediction", "price").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")

MSE:  8677.564935064935
MAE:  47.188311688311686
RMSE Squared:  93.15344832621568
R Squared:  0.9895061525097696
Explained Variance:  807492.5309390291 



In [20]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

The Best Parameters:
--------------------
RandomForestRegressionModel (uid=RandomForestRegressor_f26e1880ab09) with 20 trees


{Param(parent='RandomForestRegressor_f26e1880ab09', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='RandomForestRegressor_f26e1880ab09', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='RandomForestRegressor_f26e1880ab09', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'all',
 Param(parent='RandomForestRegressor_f26e1880ab09', name='featuresCol', doc='features column name'): 'norm_features',
 Param(parent='RandomForestRegressor_f26e1880ab09', n