# Project Milestone III: Modeling

## Importing Packages

Here we will install our visualization packages to use for histograms, bar charts, and scatterplots

In [None]:
%%local
!emr/notebook-env/bin/pip install bokeh
!emr/notebook-env/bin/pip install matplotlib

We see that the packages were successfully installed. We need to restart the kernal to continue.

Here we import pyspark and pyspark sql to start the spark application to use for our notebook.

In [None]:
from pyspark.sql.functions import col,sum
from pyspark import SparkContext
import pyspark.sql.functions as f
import pyspark.ml.feature as feat
import numpy as np
import pyspark.ml.stat as st
import numpy as np
import pandas as pd
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import pyspark.ml.regression as rg
from pyspark.ml.regression import LinearRegression, LinearRegressionSummary
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
import pyspark.ml.evaluation as ev
import pyspark.ml.tuning as tune
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

We see that the Spark application was sucessfully started and we can use Spark now.

## Loading Data

We load our dataset from our s3 bucket into a dataframe.

In [None]:
diamonds_df = (
    spark.read.csv('s3://samuell-cis4567/Spark/diamonds/diamonds.csv', 
    header=True, 
    inferSchema=True))

We have successfully loaded our data into Pyspark dataframe. We will use this dataframe for our cleansing before analysis.

## Cleaning Data

### Checking and Removing Duplicates

In the following code, we want to check our dataset to see we have duplicate rows. Duplicate rows would be an issue and would result in us taking action to remove it. We do this by comparing the total count of all records and compare it with the distinct records in the dataset.

In [None]:
diamonds_df.count(), diamonds_df.distinct().count()

From the output, we see that there are no duplicate records as the counts are the same. So we do not have to take action to remove any duplicated records.

### Checking and Removing Missing Values

We run the following code to see if there are any missing values in any of our rows within our columns of our dataset. 

In [None]:
diamonds_df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in diamonds_df.columns)).show()

From the output, we see that there are no missing values. So, we do need to take action to clean our dataset to accomodate any rows that have missing values. 

## Outlier Analysis

### Carat Column

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the carat histogram. We see that there are outliers that are greater than the value 2.6. We want to filter on our dataframe to values less than 2.6, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.carat < 2.6)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

### Depth Column 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the depth histogram. We see that there are outliers that are less than the value 50.2. We want to filter on our dataframe to values greater than 50.2, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.depth > 50.2)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the depth histogram. We see that there are outliers that are greater than the value 71.8. We want to filter on our dataframe to values less than 71.8, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.depth < 71.8)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

### Table Column

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the table histogram. We see that there are outliers that are less than the value 49. We want to filter on our dataframe to values greater than 49, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.table > 49)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the table histogram. We see that there are outliers that are greater than the value 70. We want to filter on our dataframe to values less than 70, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.table < 70)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

### Z Column

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the z histogram. We see that there are outliers that are less than or equal to the value 0. We want to filter on our dataframe to values greater than 0, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.z > 0)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the z histogram. We see that there are outliers that are greater than the value 6.36. We want to filter on our dataframe to values less than 6.36, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.z < 6.36)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

### Y Column

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the y histogram. We see that there are outliers that are less than or equal to the value 0. We want to filter on our dataframe to values greater than 0, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.y > 0)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the y histogram. We see that there are outliers that are greater than the value 10. We want to filter on our dataframe to values less than 10, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.y < 10)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

### X Column

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the x histogram. We see that there are outliers that are less than or equal to the value 0. We want to filter on our dataframe to values greater than 0, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.x > 0)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the x histogram. We see that there are outliers that are greater than the value 8.5. We want to filter on our dataframe to values less than 8.5, where most of our data is found for this column. 

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.x < 8.5)
)

By reassigning this filtered dataframe to our diamonds_df, this new dataframe will represent the filter we had previously specified. 

In [None]:
diamonds_df.count()

We begin cleansing our data by removing outliers. We do this by observing the visualization we made via the price histogram. We see that there are outliers that are greater than the value around 12,000. We want to filter on our dataframe to values less than 12,000, where most of our data is found for this column. 

## Price Column

In [None]:
diamonds_df = (
    diamonds_df
    .filter(diamonds_df.price < 12000)
)

In [None]:
diamonds_df.count()

After finalizing outlier analysis, we want to see that only 3% of our data has been removed. We take the count of our data to see how much data remains. We can see that less than 3% of the data has been filtered out.

## Normalization

For normalization, we attempted MinMax Scaler on our numerical data, both with the label (price) and without it. Through this we recevied poor model results including an R^2 score of 0. When we pursued a model with non normalized data as the following code convey, we were able to develop a model that proved reasonable.

## Dummy Variables

In our dataset, we have some categorical variables that need to converted to dummy variables in order to transform and fit it in our model. In the following code, we will print the schema of our dataframe in order to double check which of our columns are string times, hence need to converted into a dummy variable.

In [None]:
diamonds_df.printSchema()

From this output, we see the columns that we need to convert into dummy variables: cut, color, and clarity. Since they are categorical variables, we need to design code that will fulfill this conversion. 

In the following code we assign the names of those columns that we need to convert by putting it in a list. We will implement a for loop that will look at the distinct values in each of the columns and add each new column for every distinct value for each of the defined categorical columns.

In [None]:
categorical_columns=["cut", "color", "clarity"]
expressions=[]
for column in categorical_columns:
    expression = diamonds_df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
    types_expr = [f.when(f.col(column) == ty, 1).otherwise(0).alias("e_{}_{}".format(column, ty)) for ty in expression]
    expressions += types_expr
diamonds_dummy = diamonds_df.select("_c0", *categorical_columns, *expressions)

After running the previous cell, we have our new dataframe containing our dummy variables. In the following code, we want to see our new dataframe and take note of the current columns.

In [None]:
diamonds_dummy.show(3)

Looking our the first three rows of our dataframe, we see that the code successfully created dummy variables for our categorical variables. However, we need to add in our numerical columns and also remove the original variables (cut, color, clarity) while keeping its new dummy variable counterparts. 

In the following code, we want to start creating our dataframe with just the dummy variables and the numerical columns. To begin, we will drop the original categorical variables from the dummy variable dataframe. This ensures that when we create the new dataframe, we do not run into a duplicate column error (since we are going to merge this dummy dataframe with the original dataframe).

In [None]:
diamonds_dummy = diamonds_dummy.drop("cut", "color", "clarity")
diamonds_dummy.columns

Looking at the output we see that the categorical variables were removed and the dummy variables along with the unique ID column are left.

In the following code, we perform a join -- our original dataset with the categorical variables and all the numerical columns and the dummy variable dataset. We do this join on the unique ID column, which we will then drop for our subsequent modeling steps. We will reassign this new dataframe to diamond_df, where we will then drop the categorical variables in order to only include the dummies.

In [None]:
diamonds_df = diamonds_dummy.join(diamonds_df, on=["_c0"]).drop("_c0")
diamonds_df = diamonds_df.drop("cut", "color", "clarity")

In the following code, we take another look at the columns in the final dataset we will be using. Here we use n dummies. 

In [None]:
diamonds_df.columns

From the output, we have all our n dummy variables for each of the previous categorical columns and additionally the numeric variables. 

## Correlation Matrix 

In order to We need to install S3 package needed for saving correlation output to S3.

In [None]:
%%local
!emr/notebook-env/bin/pip install s3fs

In [None]:
sc.install_pypi_package('s3fs')

We see that the packages were successfully installed. We need to restart the kernal to continue.

In order to begin our process of building a Machine Learning model, we need to combine and merge all our dependent variables into a single column. To do this, we need to use VectorAssembler. VectorAssembler is a feature transformer that merges multiple columns into a vector column. With this code, we use VectorAssembler on all of our columns. Following this, we create the correlation matric using our VectorAssembler. Finally, we export our correlation matrix to our S3 bucket, which we will view as a .csv.

In [None]:
#%%spark -o corr

features_and_label = (
    feat.VectorAssembler(
        inputCols=diamonds_df.columns, 
        outputCol='features'
    )
)

corr = st.Correlation.corr(
    features_and_label.transform(diamonds_df), 
    'features', 
    'pearson'
)

print(str(corr.collect()[0][0]))
corr_pd = corr.toPandas()
output_np = np.array(corr_pd.iloc[0, 0].values).reshape(
    (corr_pd.iloc[0, 0].numRows, corr_pd.iloc[0, 0].numCols))

#Change the following path to a path in your own S3 bucket
spark.createDataFrame(pd.DataFrame(output_np)).repartition(1).write.format('csv').option('header',True
                ).mode('overwrite').option('sep',',').save('s3://jnlinao-bucket-1/Spark/Exports/corr')

Here we define our final_df which we will use in our modeling. We select all columns that will be used.

In [None]:
final_df = diamonds_df.select("price", "carat","depth", "table", "x", "y", "z",
                             'e_cut_Fair', 'e_cut_Premium', 'e_cut_Good', 'e_cut_Ideal', 
                              'e_cut_Very Good', 'e_color_D', 'e_color_G', 'e_color_J', 
                              'e_color_H', 'e_color_F', 'e_color_I', 'e_color_E', 'e_clarity_VVS2', 
                              'e_clarity_VS2', 'e_clarity_SI1', 'e_clarity_I1', 'e_clarity_SI2', 
                              'e_clarity_IF', 'e_clarity_VVS1', 'e_clarity_VS1')

In the below cell, we use vector assembler to define our input and output columns. Our input column is price and our output columns are all the features or other columns. Next, we cast the price column to a float type using the previous vector assembler and select all the columns. Next, we create a linear regression object using pyspark.ml.regression as rg and fit it to our previous price_dataset. Finally, we view the coefficients.

In [None]:
vectorAssembler = feat.VectorAssembler(
    inputCols = final_df.columns[1:]
    , outputCol= 'features')

price_dataset = (
    vectorAssembler
    .transform(final_df)
    .withColumn(
        'label'
        , f.col('Price').cast('float'))
    .select('label', 'features')
)

lr_obj = rg.LinearRegression(
    maxIter=10
    , regParam=0.01
    , elasticNetParam=1.00)
lr_model = lr_obj.fit(price_dataset)


lr_model.coefficients

We examine the model's summary by using from pyspark.ml.regression import LinearRegression, LinearRegressionSummary and assigning it to an object called summary. We then print the r^2, RMSE, MAE.

In [None]:
lr_model.coefficients
summary = lr_model.summary

print(
    summary.r2
    , summary.rootMeanSquaredError
    , summary.meanAbsoluteError
)

First, we create the same vector assembler from the previous cell. Next, we create a linear regression object by using GeneralizedLinearRegression. We set the labelCol to our dependent variable price. We define link to identity because we do not transform price. Next, we create a pipeline of the previous two objects: vectorAssembler and lr_obj. Finally, we run the pipeline on our final_df and show the actual prices and predictions.

In [None]:
vectorAssembler = feat.VectorAssembler(
    inputCols = final_df.columns[1:]
    , outputCol='features')

lr_obj = rg.GeneralizedLinearRegression(
    labelCol='price'
    , maxIter=10
    , regParam=0.01
    , link='identity'
    , linkPredictionCol="p"
)

pip = Pipeline(stages=[vectorAssembler, lr_obj])

(
    pip
    .fit(final_df)
    .transform(final_df)
    .select('price', 'prediction')
    .show(50)
)

In this cell, we define a model object that fits the pipeline to our final_df.

In [None]:
model = pip.fit(final_df)

In this cell, we transform the final_df and assign it to object prediction.

In [None]:
prediction = model.transform(final_df)

In this cell, we select the price and prediction columns and show 50 records.

In [None]:
prediction.select("price", "prediction").show(50)

In this cell, we utilize from pyspark.ml.evaluation import RegressionEvaluator to create a RegressionEvaluator object called eval. We define our labelCol as the price column and our predictionCol as the prediction column from the previous prediction object. We then display RMSE, MSE, MAE, and r^2 by evaluating the predictions respective towards a certain metric. We also format the metrics to 3 decimal spaces. 

In [None]:
eval = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(prediction)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(prediction, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(prediction, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

#r2 - coefficient of determination
r2 = eval.evaluate(prediction, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

In this cell, we create a gradient boosted tree object using import pyspark.ml.regression as rg. Our labelCol is our dependent variable price. Next, we create a pipeline object using the previous vector assembler and the new gbt_obj we created in this cell. We then fit and transform the pipeline to our final_df, select the price and prediction columns, and assign it to an object called results. Finally, we create an evaluator object by using import pyspark.ml.evaluation as ev and assign our dependent variable price to the labelCol. We then show r^2 by evaluating our results object.

In [None]:
gbt_obj = rg.GBTRegressor(
    labelCol='price'
    , minInstancesPerNode=10
    , minInfoGain=0.1
)

pip = Pipeline(stages=[vectorAssembler, gbt_obj])

results = (
    pip
    .fit(final_df)
    .transform(final_df)
    .select('price', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='price')
evaluator.evaluate(results, {evaluator.metricName: 'r2'})

In this cell, we create a gradient boosted tree object using import pyspark.ml.regression as rg. Our labelCol is our dependent variable price. Next, we create a pipeline object using the previous vector assembler and the new gbt_obj we created in this cell. We then fit and transform the pipeline to our final_df, select the price and prediction columns, and assign it to an object called results. Finally, we create an evaluator object by using import pyspark.ml.evaluation as ev and assign our dependent variable price to the labelCol. We then show RMSE by evaluating our results object.

In [None]:
gbt_obj = rg.GBTRegressor(
    labelCol='price'
    , minInstancesPerNode=10
    , minInfoGain=0.1
)

pip = Pipeline(stages=[vectorAssembler, gbt_obj])

results = (
    pip
    .fit(final_df)
    .transform(final_df)
    .select('price', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='price')
evaluator.evaluate(results, {evaluator.metricName: 'rmse'})

In this cell, we create a gradient boosted tree object using import pyspark.ml.regression as rg. Our labelCol is our dependent variable price. Next, we create a pipeline object using the previous vector assembler and the new gbt_obj we created in this cell. We then fit and transform the pipeline to our final_df, select the price and prediction columns, and assign it to an object called results. Finally, we create an evaluator object by using import pyspark.ml.evaluation as ev and assign our dependent variable price to the labelCol. We then show MSE by evaluating our results object.

In [None]:
gbt_obj = rg.GBTRegressor(
    labelCol='price'
    , minInstancesPerNode=10
    , minInfoGain=0.1
)

pip = Pipeline(stages=[vectorAssembler, gbt_obj])

results = (
    pip
    .fit(final_df)
    .transform(final_df)
    .select('price', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='price')
evaluator.evaluate(results, {evaluator.metricName: 'mse'})

In this cell, we create a gradient boosted tree object using import pyspark.ml.regression as rg. Our labelCol is our dependent variable price. Next, we create a pipeline object using the previous vector assembler and the new gbt_obj we created in this cell. We then fit and transform the pipeline to our final_df, select the price and prediction columns, and assign it to an object called results. Finally, we create an evaluator object by using import pyspark.ml.evaluation as ev and assign our dependent variable price to the labelCol. We then show MAE by evaluating our results object.

In [None]:
gbt_obj = rg.GBTRegressor(
    labelCol='price'
    , minInstancesPerNode=10
    , minInfoGain=0.1
)

pip = Pipeline(stages=[vectorAssembler, gbt_obj])

results = (
    pip
    .fit(final_df)
    .transform(final_df)
    .select('price', 'prediction')
)

evaluator = ev.RegressionEvaluator(labelCol='price')
evaluator.evaluate(results, {evaluator.metricName: 'mae'})

## Hypertuning

In the following cell, we split our data into train and test with 70% of the dataset being in the training dataset and the rest of the 30% in the test set. Doing this will fit our models to the training data which it will later be validated on the test set.

In [None]:
(diamonds_train, diamonds_test) = final_df.randomSplit([.7, .3])

After splitting our data, we will use the training data to train the classifiers with the different parameters in the pipeline. Then later on the test set results will provide the model evaluation metrics from which we can identify the best model.

In the next cell, we develop our pipelines for VectorAssembler, ChiSqSelector, and the Linear Regression Model. We are defining the parameters for each of these objects so that we can later fit this into our pipeline. However for the Linear Regression Model, we are going to build a Grid Search for the parameters RegParam and ElasticNetParam. We will run this Grid Search with different values for these parameters. At the end of the cell, we will fit the pipeline containing VectorAssembler and the Linear Regression Object to the training data. This data transformation will then be transformed with the Grid Search Cross Validator and the Regression Validator.

In [None]:
vectorAssembler = feat.VectorAssembler(
    inputCols = final_df.columns[1:]
    , outputCol='features')


selector = feat.ChiSqSelector(
    labelCol='price'
    , numTopFeatures=5
    , outputCol='selected')

lr_obj = rg.LinearRegression(
    labelCol='price'
    , featuresCol=selector.getOutputCol()
)


linReg_grid = (
    tune.ParamGridBuilder()
    .addGrid(lr_obj.regParam
            , [0.01, 0.1]
        )
    .addGrid(lr_obj.elasticNetParam
            , [1.0, 0.5]
        )
    .build()
)

linReg_ev = ev.RegressionEvaluator(labelCol='price', predictionCol='prediction')

cross_v = tune.CrossValidator(
    estimator=lr_obj
    , estimatorParamMaps=linReg_grid
    , evaluator=linReg_ev
)

pipeline = Pipeline(stages=[vectorAssembler, selector])
data_trans = pipeline.fit(diamonds_train)

linReg_modelTest = cross_v.fit(
    data_trans.transform(diamonds_train)
)

We acknowledge the model results for performance using the varied parameters for Elastic Net Param and Get Param. We use this to determine and identify them.

In [None]:
# measure performance of best model
data_trans_test = data_trans.transform(diamonds_test)
results = linReg_modelTest.transform(data_trans_test)

print(linReg_ev.evaluate(results, {linReg_ev.metricName: 'weightedPrecision'}))
print(linReg_ev.evaluate(results, {linReg_ev.metricName: 'weightedRecall'}))
print(linReg_ev.evaluate(results, {linReg_ev.metricName: 'accuracy'}))
print('Best params, regParam: %s, elasticNetParam: %s' 
      %(linReg_modelTest.bestModel._java_obj.getRegParam(),
      linReg_modelTest.bestModel._java_obj.getElasticNetParam()))