## Regression in PySpark
- This notebook covers developing a machine learning model in `PySpark` environment.
- Dataset used for analysis is [Sarah Gets a Diamond](http://store.darden.virginia.edu/sarah-gets-a-diamond) taken from University of Virginia, Darden Business Publishing. I have purposefully and randomly removed values from 30 rows for two different columns, so that different data transformation techniques in PySpark framework can be applied.
- A regression model is developed on this dataset to calculate the price(Y variable) of a diamond based on its multiple physical attributes/features(X variables).
- Different data transformations, data imputation techniques, data aggregation in PySpark environment are applied for cleaning, imputing, pre-processing, and extracting summary statistics on model development data.
- Regression model developement and tuning is performed under PySpark framework.

### 1. Installing Spark and JDK files to run a spark session on local computer

In [None]:
# !ls

In [None]:
# !apt-get update
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
# !tar xf spark-2.3.1-bin-hadoop2.7.tgz
# !pip install -q findspark

In [None]:
# !ls

In [None]:
# importing necessary libraries to start a spark session

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

In [None]:
# importing libraries necessary for data transformation, pre-processing, ml model developement etc

import pandas as pd
import numpy as np

from pyspark.sql.functions import desc, isnan, when, count, col, countDistinct, regexp_replace
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator 

### 2. Data import

In [None]:
#importing model development data as a spark dataframe

df = spark.read.csv('/content/RegData-Missing.csv', header=True, inferSchema=True)

In [None]:
print('Data Shape : {} Rows - {} Columns'.format((df.count()), (len(df.columns))))
print('\nTop 5 rows')
print(df.show(5))
print('\nBottom 5 rows')
print(df.orderBy(desc('ID')).show(5))

Data Shape : 6000 Rows - 9 Columns

Top 5 rows
+---+------------+-----+-----+-------+------+--------+------+-----+
| ID|Carat Weight|  Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+---+------------+-----+-----+-------+------+--------+------+-----+
|  1|         1.1|Ideal|    H|    SI1|    VG|      EX|   GIA| 5169|
|  2|        0.83|Ideal|    H|    VS1|    ID|      ID|  AGSL| 3470|
|  3|        0.85|Ideal|    H|    SI1|    EX|      EX|   GIA| 3183|
|  4|        0.91|Ideal|    E|    SI1|    VG|      VG|   GIA| 4370|
|  5|        0.83|Ideal|    G|    SI1|    EX|      EX|   GIA| 3171|
+---+------------+-----+-----+-------+------+--------+------+-----+
only showing top 5 rows

None

Bottom 5 rows
+----+------------+---------------+-----+-------+------+--------+------+-----+
|  ID|Carat Weight|            Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+----+------------+---------------+-----+-------+------+--------+------+-----+
|6000|        2.19|          Ideal|    E|    VS1|    EX|   

In [None]:
print('All columns in the dataset with their datatypes')
print(df.dtypes)

All columns in the dataset with their datatypes
[('ID', 'int'), ('Carat Weight', 'double'), ('Cut', 'string'), ('Color', 'string'), ('Clarity', 'string'), ('Polish', 'string'), ('Symmetry', 'string'), ('Report', 'string'), ('Price', 'int')]


In [None]:
# replacing all the whitespaces with '_' in all the column names

column_names = [column.replace(' ', '_') for column in df.columns]
df = df.toDF(*column_names)
print(df.show(2))

+---+------------+-----+-----+-------+------+--------+------+-----+
| ID|Carat_Weight|  Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+---+------------+-----+-----+-------+------+--------+------+-----+
|  1|         1.1|Ideal|    H|    SI1|    VG|      EX|   GIA| 5169|
|  2|        0.83|Ideal|    H|    VS1|    ID|      ID|  AGSL| 3470|
+---+------------+-----+-----+-------+------+--------+------+-----+
only showing top 2 rows

None


### 3. Summary Statistics and Data Imputation on model development data

In [None]:
# collecting all the numerical and categorical variables and saving them in two different lists

categorical_variables = [item[0] for item in df.dtypes if item[1].startswith('string')]
print('All categorical variables')
print(categorical_variables)
numerical_variables = [item[0] for item in df.dtypes if item[1].startswith('double') or item[1].startswith('int')]
print('All numeric variables')
print(numerical_variables)

All categorical variables
['Cut', 'Color', 'Clarity', 'Polish', 'Symmetry', 'Report']
All numeric variables
['ID', 'Carat_Weight', 'Price']


In [None]:
# counting null/nan/missing values in all the columns

dict_1 = dict(df.dtypes)
df_info = pd.DataFrame(dict_1.items(), columns=['Column', 'DataType'])
null_count = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()
null_count = list(null_count.iloc[0,])
df_info['NullCount'] = null_count
print(df_info)

         Column DataType  NullCount
0            ID      int          0
1  Carat_Weight   double         30
2           Cut   string          0
3         Color   string          0
4       Clarity   string          0
5        Polish   string          0
6      Symmetry   string         30
7        Report   string          0
8         Price      int          0


In [None]:
# summary statistics on all the numerical columns

print(df.select([column for column in numerical_variables if column != 'ID']).describe().show())

+-------+------------------+------------------+
|summary|      Carat_Weight|             Price|
+-------+------------------+------------------+
|  count|              5970|              6000|
|   mean|1.3346180904522886|11791.579333333333|
| stddev|0.4757117705068891|10184.350050741188|
|    min|              0.75|              2184|
|    max|              2.91|            101561|
+-------+------------------+------------------+

None


In [None]:
# counting the number of different sub-levels for all the categorical columns

print(df.agg(*(countDistinct(col(c)).alias(c) for c in categorical_variables)).show())

+---+-----+-------+------+--------+------+
|Cut|Color|Clarity|Polish|Symmetry|Report|
+---+-----+-------+------+--------+------+
|  5|    6|      7|     4|       4|     2|
+---+-----+-------+------+--------+------+

None


In [None]:
# counting the frequency of each sub-level for all the categorical columns : null values identified

for column in categorical_variables:
  df.groupBy(column).count().orderBy('count').show()

+---------------+-----+
|            Cut|count|
+---------------+-----+
|           Fair|  129|
|Signature-Ideal|  253|
|           Good|  708|
|      Very Good| 2428|
|          Ideal| 2482|
+---------------+-----+

+-----+-----+
|Color|count|
+-----+-----+
|    D|  661|
|    E|  778|
|    I|  968|
|    F| 1013|
|    H| 1079|
|    G| 1501|
+-----+-----+

+-------+-----+
|Clarity|count|
+-------+-----+
|     FL|    4|
|     IF|  219|
|   VVS1|  285|
|   VVS2|  666|
|    VS1| 1192|
|    VS2| 1575|
|    SI1| 2059|
+-------+-----+

+------+-----+
|Polish|count|
+------+-----+
|     G|  571|
|    ID|  595|
|    VG| 2409|
|    EX| 2425|
+------+-----+

+--------+-----+
|Symmetry|count|
+--------+-----+
|    null|   30|
|      ID|  606|
|       G|  915|
|      EX| 2048|
|      VG| 2401|
+--------+-----+

+------+-----+
|Report|count|
+------+-----+
|  AGSL|  734|
|   GIA| 5266|
+------+-----+



In [None]:
# replacing whitespace and hyphen characters in all the sub-level values for all the categorical variables 

for column in categorical_variables:
  df = df.withColumn('{}'.format(column), regexp_replace('{}'.format(column), '[\\s-]', '_'))

for column in categorical_variables:
   df.select(column).distinct().show()

+---------------+
|            Cut|
+---------------+
|      Very_Good|
|Signature_Ideal|
|          Ideal|
|           Good|
|           Fair|
+---------------+

+-----+
|Color|
+-----+
|    F|
|    E|
|    D|
|    G|
|    I|
|    H|
+-----+

+-------+
|Clarity|
+-------+
|   VVS2|
|    SI1|
|     IF|
|   VVS1|
|    VS2|
|    VS1|
|     FL|
+-------+

+------+
|Polish|
+------+
|    EX|
|    VG|
|    ID|
|     G|
+------+

+--------+
|Symmetry|
+--------+
|      EX|
|      VG|
|    null|
|      ID|
|       G|
+--------+

+------+
|Report|
+------+
|   GIA|
|  AGSL|
+------+



In [None]:
# checking all the rows where Carat_Weight value is missing

print(df.filter(col('Carat_Weight').isNull()).show())

+----+------------+---------------+-----+-------+------+--------+------+-----+
|  ID|Carat_Weight|            Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+----+------------+---------------+-----+-------+------+--------+------+-----+
| 121|        null|           Good|    H|    SI1|    EX|      EX|   GIA| 4563|
| 405|        null|           Good|    I|    SI1|    EX|       G|   GIA|15743|
| 591|        null|           Fair|    D|    SI1|    EX|      EX|   GIA| 4537|
| 640|        null|Signature_Ideal|    I|    SI1|    ID|      ID|  AGSL|11110|
| 816|        null|      Very_Good|    H|    VS2|    EX|      VG|   GIA| 5571|
| 878|        null|      Very_Good|    I|    SI1|     G|    null|   GIA| 4664|
|1162|        null|      Very_Good|    I|    VS2|    VG|      VG|   GIA|15895|
|1376|        null|      Very_Good|    I|    SI1|    VG|       G|   GIA|14819|
|1395|        null|          Ideal|    F|    SI1|    EX|    null|   GIA| 4054|
|1543|        null|          Ideal|    F|    VS2|   

In [None]:
# imputing the missing Carat_Weight rows using PySpark's Imputer functions
df_impute = df
imputer = Imputer(inputCols=['Carat_Weight'], outputCols=['Carat_Weight'])
df_impute = imputer.fit(df).transform(df_impute)

# checking all the rows where Carat_Weight value is missing : all missing values imputed
print(df_impute.filter(col('Carat_Weight').isNull()).show())

+---+------------+---+-----+-------+------+--------+------+-----+
| ID|Carat_Weight|Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+---+------------+---+-----+-------+------+--------+------+-----+
+---+------------+---+-----+-------+------+--------+------+-----+

None


In [None]:
# checking all the rows where Symmetry value is missing

print(df_impute.filter(col('Symmetry').isNull()).show())

+----+------------------+---------+-----+-------+------+--------+------+-----+
|  ID|      Carat_Weight|      Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+----+------------------+---------+-----+-------+------+--------+------+-----+
|  71|              1.02|    Ideal|    G|    VS2|    EX|    null|   GIA| 6264|
| 141|              2.04|    Ideal|    I|    SI1|    EX|    null|   GIA|16718|
| 191|              1.02|Very_Good|    G|   VVS2|     G|    null|   GIA| 8013|
| 244|              1.07|    Ideal|    F|     IF|    EX|    null|   GIA|11327|
| 309|              1.15|    Ideal|    H|    SI1|    VG|    null|   GIA| 5529|
| 878|1.3346180904522886|Very_Good|    I|    SI1|     G|    null|   GIA| 4664|
| 975|              1.51|     Good|    F|   VVS2|    EX|    null|   GIA|17203|
|1395|1.3346180904522886|    Ideal|    F|    SI1|    EX|    null|   GIA| 4054|
|1951|1.3346180904522886|Very_Good|    H|    SI1|     G|    null|   GIA| 3800|
|2252|               1.7|     Fair|    H|    VS2|   

In [None]:
# imputing all missing Symmetry values with mode Symmetry value : VG in this case

temp_df= df_impute.groupBy('Symmetry').count()
mode_variable = temp_df.orderBy(temp_df['count'].desc()).collect()[0][0]
mode_count_value = temp_df.orderBy(temp_df['count'].desc()).collect()[0][1]
print('Mode Value : {}\n'.format(mode_variable), 'Mode Count : {}'.format(mode_count_value))
df_impute = df_impute.fillna({'Symmetry':mode_variable})
print(df_impute.filter(col('Symmetry').isNull()).show())

Mode Value : VG
 Mode Count : 2401
+---+------------+---+-----+-------+------+--------+------+-----+
| ID|Carat_Weight|Cut|Color|Clarity|Polish|Symmetry|Report|Price|
+---+------------+---+-----+-------+------+--------+------+-----+
+---+------------+---+-----+-------+------+--------+------+-----+

None


### 4. One-Hot Encoding categorical variables

In [None]:
# creating numeric labels for categorical data using StringIndexer

df_string_indexer = df_impute
indexed_columns = [c+'_Index' for c in categorical_variables]

for column, index in zip(categorical_variables, indexed_columns):
  string_indexer = StringIndexer(inputCol=column, outputCol=index).fit(df_string_indexer)
  df_string_indexer = string_indexer.transform(df_string_indexer)

df_string_indexer.take(2)

[Row(ID=1, Carat_Weight=1.1, Cut='Ideal', Color='H', Clarity='SI1', Polish='VG', Symmetry='EX', Report='GIA', Price=5169, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=0.0, Polish_Index=1.0, Symmetry_Index=1.0, Report_Index=0.0),
 Row(ID=2, Carat_Weight=0.83, Cut='Ideal', Color='H', Clarity='VS1', Polish='ID', Symmetry='ID', Report='AGSL', Price=3470, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=2.0, Polish_Index=2.0, Symmetry_Index=3.0, Report_Index=1.0)]

In [None]:
# creating one hot encoded data using numeric labels for categorical data

df_ohe = df_string_indexer
ohe_columns = [c+'_classVec' for c in categorical_variables]

for index, ohe_column in zip(indexed_columns, ohe_columns):
  oh_encoder = OneHotEncoder(dropLast=True, inputCol=index, outputCol=ohe_column)
  df_ohe = oh_encoder.transform(df_ohe)
  
df_ohe.take(2)

[Row(ID=1, Carat_Weight=1.1, Cut='Ideal', Color='H', Clarity='SI1', Polish='VG', Symmetry='EX', Report='GIA', Price=5169, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=0.0, Polish_Index=1.0, Symmetry_Index=1.0, Report_Index=0.0, Cut_classVec=SparseVector(4, {0: 1.0}), Color_classVec=SparseVector(5, {1: 1.0}), Clarity_classVec=SparseVector(6, {0: 1.0}), Polish_classVec=SparseVector(3, {1: 1.0}), Symmetry_classVec=SparseVector(3, {1: 1.0}), Report_classVec=SparseVector(1, {0: 1.0})),
 Row(ID=2, Carat_Weight=0.83, Cut='Ideal', Color='H', Clarity='VS1', Polish='ID', Symmetry='ID', Report='AGSL', Price=3470, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=2.0, Polish_Index=2.0, Symmetry_Index=3.0, Report_Index=1.0, Cut_classVec=SparseVector(4, {0: 1.0}), Color_classVec=SparseVector(5, {1: 1.0}), Clarity_classVec=SparseVector(6, {2: 1.0}), Polish_classVec=SparseVector(3, {2: 1.0}), Symmetry_classVec=SparseVector(3, {}), Report_classVec=SparseVector(1, {}))]

In [None]:
# Collecting all numerical and one-hot-encoded data as sparse vectors
# Saving the results in column : features
# Column 'features' serves as X variable, column 'Price' as target/Y variable

model_inputs = ohe_columns + numerical_variables
model_inputs.remove('ID')

vector_assembler = VectorAssembler(inputCols=model_inputs, outputCol='features')
df_model = vector_assembler.transform(df_ohe)
df_model.take(2) 

[Row(ID=1, Carat_Weight=1.1, Cut='Ideal', Color='H', Clarity='SI1', Polish='VG', Symmetry='EX', Report='GIA', Price=5169, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=0.0, Polish_Index=1.0, Symmetry_Index=1.0, Report_Index=0.0, Cut_classVec=SparseVector(4, {0: 1.0}), Color_classVec=SparseVector(5, {1: 1.0}), Clarity_classVec=SparseVector(6, {0: 1.0}), Polish_classVec=SparseVector(3, {1: 1.0}), Symmetry_classVec=SparseVector(3, {1: 1.0}), Report_classVec=SparseVector(1, {0: 1.0}), features=SparseVector(24, {0: 1.0, 5: 1.0, 9: 1.0, 16: 1.0, 19: 1.0, 21: 1.0, 22: 1.1, 23: 5169.0})),
 Row(ID=2, Carat_Weight=0.83, Cut='Ideal', Color='H', Clarity='VS1', Polish='ID', Symmetry='ID', Report='AGSL', Price=3470, Cut_Index=0.0, Color_Index=1.0, Clarity_Index=2.0, Polish_Index=2.0, Symmetry_Index=3.0, Report_Index=1.0, Cut_classVec=SparseVector(4, {0: 1.0}), Color_classVec=SparseVector(5, {1: 1.0}), Clarity_classVec=SparseVector(6, {2: 1.0}), Polish_classVec=SparseVector(3, {2: 1.0}), Symmetry_cla

### 5. Model Developement

In [None]:
# Splitting model development data into train-test data

df_train, df_test = df_model.randomSplit([0.8, 0.2], seed=21)
df_train.count(), df_test.count()

(4821, 1179)

In [None]:
# fitting base Gradient Boosting regressor model
gbt = GBTRegressor(labelCol='Price')
gbt_evaluator = RegressionEvaluator(metricName='mae', labelCol='Price',  predictionCol='prediction')
gbt_model = gbt.fit(df_train)

# making model predictions on test-data
gbt_predictions = gbt_model.transform(df_test)
gbt_predictions.select('ID', 'features', 'Price', 'prediction').take(3)

[Row(ID=2, features=SparseVector(24, {0: 1.0, 5: 1.0, 11: 1.0, 17: 1.0, 22: 0.83, 23: 3470.0}), Price=3470, prediction=3252.8526907974706),
 Row(ID=8, features=SparseVector(24, {6: 1.0, 9: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 1.5, 23: 10450.0}), Price=10450, prediction=10714.605063965304),
 Row(ID=9, features=SparseVector(24, {0: 1.0, 5: 1.0, 9: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 2.11, 23: 18609.0}), Price=18609, prediction=17847.173270856776)]

In [None]:
# evaluating model predictions on test-data using Mean Absolute Error metric
mae = round((gbt_evaluator.evaluate(gbt_predictions)),2)
print('Test MAE = {}'.format(mae))

Test MAE = 452.33


### 6. Hyperparameter Tuning

In [None]:
# defining model hyperparameter grid
parameter_grid = (ParamGridBuilder()
                  .addGrid(gbt.maxDepth, [3,4,5,6])
                  .addGrid(gbt.maxIter, [10, 20])
                  .build())

# defining MAE as metric to evaluate model performance
gbt_evaluator = RegressionEvaluator(metricName='mae', labelCol=gbt.getLabelCol(),
                                    predictionCol=gbt.getPredictionCol())
cv = CrossValidator(estimator=gbt, evaluator=gbt_evaluator,
                    estimatorParamMaps=parameter_grid, numFolds=3, parallelism = 4)

# fitting model on train data and calculating predictions on test data
gbt_cv_model = cv.fit(df_train)
gbt_cv_predictions = gbt_cv_model.transform(df_test)
gbt_cv_predictions.select('ID', 'features', 'Price', 'prediction').take(3)

[Row(ID=2, features=SparseVector(24, {0: 1.0, 5: 1.0, 11: 1.0, 17: 1.0, 22: 0.83, 23: 3470.0}), Price=3470, prediction=3327.4049558629195),
 Row(ID=8, features=SparseVector(24, {6: 1.0, 9: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 1.5, 23: 10450.0}), Price=10450, prediction=10710.636207282394),
 Row(ID=9, features=SparseVector(24, {0: 1.0, 5: 1.0, 9: 1.0, 16: 1.0, 18: 1.0, 21: 1.0, 22: 2.11, 23: 18609.0}), Price=18609, prediction=17684.093385105298)]

In [None]:
# evaluating performance of tuned Random Forest model
mae = round((gbt_evaluator.evaluate(gbt_cv_predictions)),2)
print('Test MAE = {}'.format(mae))

Test MAE = 416.0
