## Assignment 4 - Spark ML

## Learning Outcomes
In this assignment you will: 

-  Use ML piplenes
-  Improve a Random Forest model
-  Perform Hyperparameter tuning

** Question 1: ** (5 marks)

In our learning from this module, we have identified a fairly significant link by leveraging the ML pipeline, a more sophisticated model, and better hyperparameter tuning. However these results are still a bit disappointing. With that being said, we're working with very few features and we've likely made some assumptions that just aren't quite valid (like zip code shortening). Also, just because a rich zip code exists doesn't mean that the farmer's market would be held in that zip code too. In fact we might want to start looking at neighboring zip codes or doing some sort of distance measure to predict whether or not there exists a farmer's market in a certain mile radius from a wealthy zip code.

With that being said, we've got a lot of other potential features and plenty of other parameters to tune on our random forest so play around with the above pipeline and see if you can improve it further!  
    
You may use the same classifier we built in the notebook(  command cells 65 to 82)  in this module.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html

(https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/2446126855165611/6085673883631125/latest.html)

** Question 2 ** ( 7 marks)


Using the Apache Spark ML pipeline, build a model to predict the price of a diamond based on the available features.

Read from the following notebook for details about dataset.

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html

(https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5915990090493625/4396972618536508/6085673883631125/latest.html)


Note:  Please submit the **published** notebook link in a word/pdf document.  Do not submit HTML, IPython notebook, or archive (DBC) formats.

** Question 1 - Solution:

In [0]:
# Read The data
taxes2013 = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/irs_zip_code_data/data-001/2013_soi_zipcode_agi.csv"))

markets = (spark.read
  .option("header", "true")
  .csv("dbfs:/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/market_data.csv"))

In [0]:
# Register spark SQL tables
taxes2013.createOrReplaceTempView("taxes2013")
markets.createOrReplaceTempView("markets")

In [0]:
%sql 
DROP TABLE IF EXISTS cleaned_taxes;

CREATE TABLE cleaned_taxes AS
SELECT state, int(zipcode / 10) as zipcode, 
  int(mars1) as single_returns, 
  int(mars2) as joint_returns, 
  int(numdep) as numdep, 
  double(A02650) as total_income_amount,
  double(A00300) as taxable_interest_amount,
  double(a01000) as net_capital_gains,
  double(a00900) as biz_net_income
FROM taxes2013

In [0]:
sqlContext.cacheTable("cleaned_taxes")
cleanedTaxes = spark.sql("SELECT * FROM cleaned_taxes")
summedTaxes = cleanedTaxes.groupBy("zipcode").sum() 
cleanedMarkets = (markets
  .selectExpr("*", "int(zip / 10) as zipcode")
  .groupBy("zipcode")
  .count()
  .selectExpr("double(count) as count", "zipcode as zip"))
joined = (cleanedMarkets.join(summedTaxes, cleanedMarkets.zip == summedTaxes.zipcode, "outer"))

In [0]:
display(joined)

count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
1009.0,,,,,,,,,,
1.0,0.0,0.0,0.0,66430180.0,52885400.0,96500590.0,9274122025.0,82710640.0,399567789.0,310024683.0
1.0,3.0,,,,,,,,,
4.0,60.0,,,,,,,,,
1.0,61.0,,,,,,,,,
2.0,62.0,,,,,,,,,
1.0,63.0,,,,,,,,,
1.0,65.0,,,,,,,,,
4.0,66.0,,,,,,,,,
4.0,67.0,,,,,,,,,


In [0]:
prepped = joined.na.fill(0)
display(prepped)

count,zip,zipcode,sum(zipcode),sum(single_returns),sum(joint_returns),sum(numdep),sum(total_income_amount),sum(taxable_interest_amount),sum(net_capital_gains),sum(biz_net_income)
1009.0,0,0,0,0,0,0,0.0,0.0,0.0,0.0
1.0,0,0,0,66430180,52885400,96500590,9274122025.0,82710640.0,399567789.0,310024683.0
1.0,3,0,0,0,0,0,0.0,0.0,0.0,0.0
4.0,60,0,0,0,0,0,0.0,0.0,0.0,0.0
1.0,61,0,0,0,0,0,0.0,0.0,0.0,0.0
2.0,62,0,0,0,0,0,0.0,0.0,0.0,0.0
1.0,63,0,0,0,0,0,0.0,0.0,0.0,0.0
1.0,65,0,0,0,0,0,0.0,0.0,0.0,0.0
4.0,66,0,0,0,0,0,0.0,0.0,0.0,0.0
4.0,67,0,0,0,0,0,0.0,0.0,0.0,0.0


In [0]:
nonFeatureCols = ["zip", "zipcode", "count", "sum(zipcode)"] #Removing sum(zipcode) from the features
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]

# VectorAssembler assembles all of these columns into one single vector
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)

In [0]:
training, test = finalPrep.randomSplit([0.7, 0.3])
training.cache()
test.cache()
print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline
#Let's choose a model
rfModel = (RandomForestRegressor()
  .setLabelCol("count")
  .setFeaturesCol("features"))

#Our ParamMaps for the estimator
paramGrid = (ParamGridBuilder()
  #.addGrid(rfModel.maxDepth, [5, 10])
  #.addGrid(rfModel.numTrees, [20, 60]) #Original values - it will train 4 models: 5,20 - 5,60 - 10,20 - 10,60
  .addGrid(rfModel.maxDepth, [5,10])
  .addGrid(rfModel.numTrees, [10]) #Only two trained models, i.e, 5-10 and 10-10 
  .build())

stages = [rfModel]

pipeline = Pipeline().setStages(stages)
#This is our hyperparameter tuning
cv = (CrossValidator() 
  .setEstimator(pipeline) #Out pipeline to tune
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("count")) #our metric
  .setNumFolds(3)) #CrossValidator will generate 3 (training, test) dataset pairs

pipelineFitted = cv.fit(training)

In [0]:
print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout2 = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", 
    "double(round(prediction)) as prediction", 
    "count", 
    """CASE double(round(prediction)) = count 
  WHEN true then 1
  ELSE 0
END as equal"""))
display(holdout2)

raw_prediction,prediction,count,equal
1.3040003983806805,1.0,0.0,0
0.4798063095617413,0.0,0.0,1
0.158797659274846,0.0,0.0,1
0.3349223902636997,0.0,0.0,1
2.077986427615381,2.0,0.0,0
1.261387850149703,1.0,1.0,1
1.3217392278594624,1.0,1.0,1
0.8986297798899907,1.0,2.0,0
1.8585082431346824,2.0,2.0,1
0.9467728896601764,1.0,2.0,0


In [0]:
from pyspark.mllib.evaluation import RegressionMetrics
rm2 = RegressionMetrics(holdout2.select("prediction", "count").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm2.meanSquaredError)
print("MAE: ", rm2.meanAbsoluteError)
print("RMSE Squared: ", rm2.rootMeanSquaredError)
print("R Squared: ", rm2.r2)
print("Explained Variance: ", rm2.explainedVariance, "\n")

In [0]:
display(holdout2.selectExpr("sum(equal)/sum(1)"))

(sum(equal) / sum(1))
0.3442717328727691


** Question 2 - Solution

In [0]:
#Let's get the Diamonds dataset
dataPath = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"
diamonds_df = (sqlContext.read.format("com.databricks.spark.csv")
  .option("header","true")
  .option("inferSchema", "true")
  .load(dataPath))

In [0]:
#Let's display the dataset
display(diamonds_df)

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


In [0]:
#Some data exploration
diamonds_df.toPandas().info()

In [0]:
#Let's drop the _c0 column 
diamonds_df = diamonds_df.drop("_c0")

In [0]:
#Let's check out 4 categorical variables: cut, color, and clarity
diamonds_df.groupBy('cut').count().orderBy('count', ascending=False).show()
diamonds_df.groupBy('color').count().orderBy('count', ascending=False).show()
diamonds_df.groupBy('clarity').count().orderBy('count', ascending=False).show()

In [0]:
#Handling our categorical variables
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
 
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(diamonds_df) for column in ["cut", "color", "clarity"] ]
pipeline = Pipeline(stages=indexers)
#Transforming the dataset to feed our ML model
prep_diamonds = pipeline.fit(diamonds_df).transform(diamonds_df)
 
display(prep_diamonds)

carat,cut,color,clarity,depth,table,price,x,y,z,cut_index,color_index,clarity_index
0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43,0.0,1.0,2.0
0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31,1.0,1.0,0.0
0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31,3.0,1.0,3.0
0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63,1.0,5.0,1.0
0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75,3.0,6.0,2.0
0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48,2.0,6.0,4.0
0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47,2.0,5.0,5.0
0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53,2.0,3.0,0.0
0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49,4.0,1.0,1.0
0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39,2.0,3.0,3.0


In [0]:
#Features selection
nonFeatureCols = ["price","cut","color","clarity","x","y","z"]
featureCols = [item for item in prep_diamonds.columns if item not in nonFeatureCols]
print(featureCols)

In [0]:
# VectorAssembler assembles all of these columns into one single vector
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

final_diamonds = assembler.transform(prep_diamonds)
final_diamonds.count()

In [0]:
#Preparing our dataset for ML
training, test = final_diamonds.randomSplit([0.7, 0.3])
training.cache()
test.cache()
print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline
#Let's choose a model
rfModel = (RandomForestRegressor()
  .setLabelCol("price")
  .setFeaturesCol("features"))

#Our ParamMaps for the estimator
paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [5,10])
  .addGrid(rfModel.numTrees, [20,60]) #4 trained models, i.e, 5,20 - 5,60 - 10,20 and 10,60 
  .build())

stages = [rfModel]

pipeline = Pipeline().setStages(stages)
#This is our hyperparameter tuning
cv = (CrossValidator() 
  .setEstimator(pipeline) #Out pipeline to tune
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("price")) #our metric
  .setNumFolds(3)) #CrossValidator will generate 3 (training, test) dataset pairs

pipelineFitted = cv.fit(training)

In [0]:
holdout_diamond = pipelineFitted.transform(test).select("prediction", "price")
holdout_diamond = holdout_diamond.withColumn("price", holdout_diamond["price"].cast("double"))
display(holdout_diamond)

prediction,price
933.8179359246858,367.0
882.5704683371563,367.0
877.2868767586152,367.0
951.4718732452342,367.0
881.5235810324119,386.0
1005.3445538598776,386.0
1009.509481577842,342.0
1038.9368172010695,470.0
1103.5867820807805,369.0
832.3242615898853,373.0


In [0]:
rm = (RegressionMetrics(holdout_diamond.rdd.map(lambda x: (x[0], x[1]))))
print("MSE: ", rm.meanSquaredError)
print("MAE: ", rm.meanAbsoluteError)
print("RMSE Squared: ", rm.rootMeanSquaredError)
print("R Squared: ", rm.r2)
print("Explained Variance: ", rm.explainedVariance, "\n")