### CUTE - Spark

Given details of household, we need to predict the claim_amount of them

#### Setting PATH and ENV variables for Spark Libraries

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

#### Creating Spark Context

In [2]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

configObj = SparkConf().setAppName("HouseHold Data Application").setMaster('local')
contextObj = SparkContext(conf = configObj)
sparkSessionObj = SparkSession(contextObj)

#### Importing dependant libraries

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import isnan, when, count, col, countDistinct


In [4]:
## Reading data frame from hdfs location
## input csv file copied into my hdfs space using hdfs dfs -cp command
## 1. Create a dataframe for above csv data.using spark.read.format()
##   (First line in the dataset is header).
## 3. As the '?' describes the unknown data in each column, identify
## how many columns exists with '?' (NAs)
## 4. Replace all '?' values in the result data frame as NAs.


householdDF = sparkSessionObj.read.format("csv")\
                             .option("header","true")\
                             .option("nullValue","?")\
                             .option("ignoreTrailingWhiteSpace","true")\
                             .option("ignoreLeadingWhiteSpace","true")\
                             .option("inferSchema","true")\
                             .option("quote",'"')\
                             .load("hdfs:///user/1864B39/cute_dataset")

In [None]:
##2. Verify summary of the dataframe (how many rows and columns)
print("Number of Columns : {} ").format(len(householdDF.columns))
print("Number of Rows : {}").format(householdDF.count())

In [None]:
householdDF.show(5)

In [None]:
householdDF.dtypes

In [5]:
countNaDf = householdDF.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in householdDF.columns]).show()

+------+------------+-------+-------------+----------+----------+-----------+--------------+----+------+----+------+------+----+------+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|  Cat2|Cat3|  Cat4|  Cat5|Cat6|  Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|Var1|Var2|Var3|Var4|Var5|Var6|Var7|Var8|NVCat|NVVar1|NVVar2|NVVar3|NVVar4|Claim_Amount|
+------+------------+-------+-------------+----------+----------+-----------+--------------+----+------+----+------+------+----+------+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|     0|           0|      0|            0|         0|       653|        653|           653|1995|369621| 325|426569|426969|1995|543322| 278|   0|  314| 2316| 2134|   568|   0|   0|   0|   0|   0|   0|   0|   0|    0|     0|   

In [None]:
countNaDf.printSchema

In [None]:
#5. Remove all the columns where the no. of rows with '?' exceeds 35% for that column.
rowCount = householdDF.count()
threshold = ((rowCount*35)/100)



In [6]:
#6. Fill remaining Null Values with Zeroes in the entire dataset, if any.

householdDF = householdDF.fillna('0', householdDF.columns)
householdDF = householdDF.fillna(0, householdDF.columns)


In [7]:
householdDF.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in householdDF.columns]).show()

+------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|Row_ID|Household_ID|Vehicle|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|Var1|Var2|Var3|Var4|Var5|Var6|Var7|Var8|NVCat|NVVar1|NVVar2|NVVar3|NVVar4|Claim_Amount|
+------+------------+-------+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+----+----+----+----+----+----+----+----+-----+------+------+------+------+------------+
|     0|           0|      0|            0|         0|         0|          0|             0|   0|   0|   0|   0|   0|   0|   0|   0|   0|    0|    0|    0|     0|   0|   0|   0|   0|   0|   0|   0|   0|    0|     0|     0|     0|     0|           0|


In [None]:
householdDF.dtypes

In [8]:
#7. Derive a new column Vehicle_age = Current Year - Model year

householdDF = householdDF.withColumn("Vehicle_age", householdDF['Calendar_Year']-householdDF['Model_Year'])

In [None]:
householdDF.select('Vehicle_age').show(5)

In [10]:
#8. Extract all the non-zero records from the dataset(> 0) into a new dataframe and verify the number of rows.
nonZeroDf = householdDF.replace(0,float('nan'))
nonZeroDf = nonZeroDf.dropna(how="any")

In [11]:
nonZeroDf.count()

7234

In [12]:
#9. Remove the columns Row_ID, Household_ID and Vehicle from the original dataframe.
drop_column_list = ['Row_ID','Household_ID','Vehicle']
householdDF = householdDF.select([column for column in householdDF.columns if column not in drop_column_list])

In [13]:
householdDF.columns

['Calendar_Year',
 'Model_Year',
 'Blind_Make',
 'Blind_Model',
 'Blind_Submodel',
 'Cat1',
 'Cat2',
 'Cat3',
 'Cat4',
 'Cat5',
 'Cat6',
 'Cat7',
 'Cat8',
 'Cat9',
 'Cat10',
 'Cat11',
 'Cat12',
 'OrdCat',
 'Var1',
 'Var2',
 'Var3',
 'Var4',
 'Var5',
 'Var6',
 'Var7',
 'Var8',
 'NVCat',
 'NVVar1',
 'NVVar2',
 'NVVar3',
 'NVVar4',
 'Claim_Amount',
 'Vehicle_age']

In [14]:
#10. Create two new dataframes
#A. train_DF, For the data where Calendar_Year = 2005 and 2006
#B. test_DF, For the data where Calendar_Year = 2007

test_DF = householdDF.filter(householdDF['Calendar_Year'] == 2007)
train_DF = householdDF.filter(householdDF['Calendar_Year'] != 2007)

In [15]:

train_DF.select('Calendar_Year').distinct().show()

+-------------+
|Calendar_Year|
+-------------+
|         2006|
|         2005|
+-------------+



In [16]:
test_DF.select('Calendar_Year').distinct().show()

+-------------+
|Calendar_Year|
+-------------+
|         2007|
+-------------+



In [17]:
#11. Separate into Categorical and Continuous attributes.
householdDF.dtypes

[('Calendar_Year', 'int'),
 ('Model_Year', 'int'),
 ('Blind_Make', 'string'),
 ('Blind_Model', 'string'),
 ('Blind_Submodel', 'string'),
 ('Cat1', 'string'),
 ('Cat2', 'string'),
 ('Cat3', 'string'),
 ('Cat4', 'string'),
 ('Cat5', 'string'),
 ('Cat6', 'string'),
 ('Cat7', 'string'),
 ('Cat8', 'string'),
 ('Cat9', 'string'),
 ('Cat10', 'string'),
 ('Cat11', 'string'),
 ('Cat12', 'string'),
 ('OrdCat', 'int'),
 ('Var1', 'double'),
 ('Var2', 'double'),
 ('Var3', 'double'),
 ('Var4', 'double'),
 ('Var5', 'double'),
 ('Var6', 'double'),
 ('Var7', 'double'),
 ('Var8', 'double'),
 ('NVCat', 'string'),
 ('NVVar1', 'double'),
 ('NVVar2', 'double'),
 ('NVVar3', 'double'),
 ('NVVar4', 'double'),
 ('Claim_Amount', 'double'),
 ('Vehicle_age', 'int')]

In [18]:
numericalVars=['Var1','Var2','Var3','Var4','Var5','Var6','Var7','Var8','NVVar1','NVVar2','NVVar3','NVVar4','Claim_Amount','Vehicle_age','Calendar_Year','Model_Year']
categoricalVars = [col for col in householdDF.columns if col not in numericalVars]

In [19]:
print(categoricalVars)

['Blind_Make', 'Blind_Model', 'Blind_Submodel', 'Cat1', 'Cat2', 'Cat3', 'Cat4', 'Cat5', 'Cat6', 'Cat7', 'Cat8', 'Cat9', 'Cat10', 'Cat11', 'Cat12', 'OrdCat', 'NVCat']


In [40]:
#12. Encode Categorical attributes. (Dummy)
#13. Create a vector for all the predictor variable
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
indexers_Cat = [StringIndexer(inputCol=cat_Var_Name, outputCol="{0}_index".format(cat_Var_Name),handleInvalid="skip") for cat_Var_Name in categoricalVars ]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]
assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat],outputCol="cat_features")
assembler = VectorAssembler(inputCols=["cat_features"], outputCol="features")

In [41]:
preprocessing_Stages = indexers_Cat+encoders_Cat+[assembler_Cat]+[assembler]
print(preprocessing_Stages)

[StringIndexer_44988a34e8300edca8ba, StringIndexer_4db28ff09e79d7645b89, StringIndexer_402f95c4c941351f502f, StringIndexer_42c2a1d32a66eaca0a75, StringIndexer_462ba77ad893b87ec6ba, StringIndexer_434e8bcf5dffa33cda71, StringIndexer_45378d2622136f8056ee, StringIndexer_4461a1c28f9f26f654ab, StringIndexer_4d60bbedec1c0158e0bf, StringIndexer_41518119ce6e59edbee1, StringIndexer_4024a3fad6cd574153a7, StringIndexer_4a42bfd50407b0726eaa, StringIndexer_47389aa7f93ba30b7c48, StringIndexer_4dfd921ff3792fd3be20, StringIndexer_4520b9dbed11c8fc1094, StringIndexer_4ec399bd663b42bd5559, StringIndexer_46b1be93653d04c61032, OneHotEncoder_45888101dd4885e50743, OneHotEncoder_4fff8debe58498eab088, OneHotEncoder_4445b7bed0f74188c44c, OneHotEncoder_43db874e285d6e2b6f80, OneHotEncoder_4ea8a352e5dd72074708, OneHotEncoder_42c18d9a2226c05e2ab0, OneHotEncoder_47a28e39acecee0dde1e, OneHotEncoder_4035983bda0c149a1822, OneHotEncoder_49b182e078d8106c3009, OneHotEncoder_4a6286e72bb1c39b65cc, OneHotEncoder_4cdaa6f3d1656

In [42]:
##14. Set the target variable (label)

##15. Train model –
##Build at least 2 models.
## Model1 - Linear Regression
from pyspark.ml.regression import LinearRegression


In [43]:
linearR = LinearRegression(maxIter=10,labelCol="Claim_Amount",featuresCol="features")

In [44]:
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=preprocessing_Stages + [linearR])

lr_Pipeline_model = lr_Pipeline.fit(train_DF)

In [45]:
print("Coefficients: " + str(lr_Pipeline_model.stages[-1].coefficients)) # -1 since am printing the last vector
print("Intercept: " + str(lr_Pipeline_model.stages[-1].intercept))

Coefficients: [-0.0477867932949,0.0260239220273,0.0506469530345,-0.0181470523768,0.0853623134364,0.0567083721909,-0.113368793774,0.043315643046,0.0871728484432,-0.0848561601337,0.0948850948856,-0.157836341768,-0.12367950232,0.0229552557317,0.0255331340436,0.167405641127,0.530508415807,-0.0899984290923,-0.291631836339,0.01121834859,-0.119353842626,0.0166094265667,-0.102088235215,-0.0917457259738,-0.00195731104093,-0.173048564404,-0.273423165656,-0.0887009223891,-0.33176107942,-0.0785698330652,0.125305885322,0.0462245989467,0.233812420052,0.217211809229,0.231963265583,0.00999335349364,-0.481345644897,-0.400987516339,-0.112018230534,0.194020942004,-0.474960844431,0.401425108839,-0.458601719598,-0.000121868571522,0.173464246348,-0.22795338368,-0.134969733389,-0.421955555727,-0.487484765057,-0.35894148496,-0.485226735688,-0.47243511402,-0.419431574867,-0.265478699072,-0.612913422526,-0.48211584135,4.23281644499,-0.434599672062,-0.580107848457,-0.37642049467,-0.281418070448,-0.402111444516,-

In [52]:
## Model2 - Decision Tree
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(labelCol="Claim_Amount",featuresCol="features")
dt_Pipeline = Pipeline(stages=preprocessing_Stages+[dt]) 
dt_Pipeline_model = dt_Pipeline.fit(train_DF)

In [46]:
##16. Display the model summary.
# Linear model summary

linearModelSummary = lr_Pipeline_model.stages[-1].summary
print(linearModelSummary.objectiveHistory)
print(linearModelSummary.r2)

[0.49999999999999994, 0.49904813083246663, 0.4989053081095105, 0.4988708487762159, 0.4988167535959395, 0.49881392458775886, 0.4988109757404175, 0.4988105192491251, 0.49880829637137275, 0.49880680296347474, 0.4988062718444935]
0.00238745631102


In [47]:
##17. Make Predictions on test data (test_DF) using both the models.

## Linear Model predictions
train_predictions_lr = lr_Pipeline_model.transform(train_DF)
test_predictions_lr = lr_Pipeline_model.transform(test_DF)


In [34]:
test_predictions_lr.show(2)

+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+---------+---------+----------+---------+---------+---------+---------+----------+-----+----------+----------+----------+----------+------------+-----------+----------------+-----------------+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+------------+-----------+--------------+------------------+------------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------------+--------------------+------------------+
|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|     Var1|     Var2|      Var3|     Var4|     

In [53]:
## Decision Tree predictions
train_predictions_dt = dt_Pipeline_model.transform(train_DF)
test_predictions_dt = dt_Pipeline_model.transform(test_DF)

In [54]:
test_predictions_dt.show(2)

+-------------+----------+----------+-----------+--------------+----+----+----+----+----+----+----+----+----+-----+-----+-----+------+---------+---------+----------+---------+---------+---------+---------+----------+-----+----------+----------+----------+----------+------------+-----------+----------------+-----------------+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+------------+-----------+--------------+------------------+------------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------+--------------------+--------------------+------------------+
|Calendar_Year|Model_Year|Blind_Make|Blind_Model|Blind_Submodel|Cat1|Cat2|Cat3|Cat4|Cat5|Cat6|Cat7|Cat8|Cat9|Cat10|Cat11|Cat12|OrdCat|     Var1|     Var2|      Var3|     Var4|     

In [50]:
##19. Compute both train and test error metrics for all the models.
#linear model
##20. Calculate Root Mean Squared Error (RMSE) and R-Square values.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="Claim_Amount",predictionCol="prediction",metricName="rmse")

predictionAndLabels_train_lr = train_predictions_lr.select("prediction", "Claim_Amount")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)

print("Train rmse  = " + str(train_accuracy_lr))




Train rmse  = 42.0608687779


In [51]:
predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "Claim_Amount")
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

print("Test rmse = " + str(test_accuracy_lr))

Test rmse = 30.2006155698


In [55]:
# decision tree model
predictionAndLabels_train_dt = train_predictions_dt.select("prediction", "Claim_Amount")
train_accuracy_dt = evaluator.evaluate(train_predictions_dt)

print("Train rmse  = " + str(train_accuracy_dt))

predictionAndLabels_test_dt = test_predictions_dt.select("prediction", "Claim_Amount")
test_accuracy_dt = evaluator.evaluate(test_predictions_dt)

print("Test rmse = " + str(test_accuracy_dt))

Train rmse  = 41.8892482702
Test rmse = 30.5654793429


In [56]:
#18. Apply Cross validation technique and tune your models.
## Linear model cv
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [57]:
paramGrid = ParamGridBuilder()\
    .addGrid(linearR.regParam, [0.1,0.2,]) \
    .addGrid(linearR.elasticNetParam, [0.5])\
    .addGrid(linearR.maxIter,[10])\
    .build()
    
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(labelCol="Claim_Amount"),
                             numFolds=2)     

In [60]:
# Run cross-validation, and choose the best set of parameters.
lr_crossval_Model = lr_crossval.fit(train_DF)

In [61]:
train_predictions_lrcv = lr_crossval_Model.transform(train_DF)
test_predictions_lrcv = lr_crossval_Model.transform(test_DF)

In [62]:
##20. Calculate Root Mean Squared Error (RMSE) and R-Square values.

predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "Claim_Amount")
train_accuracycv = evaluator.evaluate(train_predictions_lrcv)
print("Train set rmse  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "Claim_Amount")
test_accuracycv = evaluator.evaluate(test_predictions_lrcv)
print("Test set rmse = " + str(test_accuracycv))

Train set rmse  = 42.0794443352
Test set rmse = 30.1346587524


In [None]:
## Tuning DT Model
paramGridDT = ParamGridBuilder()\
    .addGrid(dt.maxDepth, [1,6,10]) \
    .build()
    
dt_crossval = CrossValidator(estimator=dt_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=2,parallelism=3)   

In [None]:
# Run cross-validation, and choose the best set of parameters.
dt_crossval_Model = dt_crossval.fit(train_DF)

In [None]:
train_predictions_dtcv = dt_crossval_Model.transform(train_DF)
test_predictions_dtcv = dt_crossval_Model.transform(test_DF)

In [None]:
predictionAndLabels_train_dtcv = train_predictions_dtcv.select("prediction", "Claim_Amount")
train_accuracydtcv = evaluator.evaluate(train_predictions_dtcv)
print("Train set accuracy  = " + str(train_accuracydtcv))

predictionAndLabels_test_dtcv = test_predictions_dtcv.select("prediction", "Claim_Amount")
test_accuracydtcv = evaluator.evaluate(test_predictions_dtcv)
print("Test set accuracy = " + str(test_accuracydtcv))