## Springboard MLE Capstone
### Used Car Price Predictor - Scaling up with PySpark ML
##### Author - Joyjit Chowdhury, Springboard - MLE


## 1:  Objective
This is a Deep Learning (DL) approach for my Capstone Project - Used Car Price Prediction.
Details mentioned below:

- **ML Problem Class** :  _Regression_
- **Inputs**   :  _Used car attributes - **year,make,model,trim,odometer,state,colorexterior,accidenthist,owner,usage**_ <br/>
    **Example**:

            "year" : 2014,
            "make" : "toyota",
            "model" : "corolla",
            "trim" : "le plus",
            "odometer" : 20700,
            "state" :  "AZ",
            "colorexterior" : "blue",
            "colorinterior" : "black",
            "accidenthist" : "n",
            "owner" : 5,
            "usage" : "personal"
            
            
- **Output**  :  Price of the used car



## 2:  High Level Design

- **Implementation**   :  _Random Forest Regressor with PySpark_

- **Model architecture**       :   
   
    
- **Activation functions** :  _ReLu_

- **Initializer** : _Xavier (glorot uniform)_

- **Cost Optmizer** : SGD (Stochastic Gradient Descent)

- **Loss Function** : MSE (Mean Squared Error)
- **Epochs and Callback** : 1000 epochs with Early Stopping Callback for 5 iterations
- **Input Feature Augmentation**: 
  - Mandatory_input_features = year, make, model,trim, odometer,state,colorexterior,colorinterior
  - Features augmented - ReliabilityRank,CostOfLivingRank,PercentSales, AvgDaysToTurn,ReviewScore,AvgMPG, LuxurySportsOrHybrid, drivetrain,bodytype
- **Feature Encoding**: 
  - Numeric features - ReliabilityRank,CostOfLivingRank,PercentSales,AvgDaysToTurn,ReviewScore,AvgMPG,age,odo
  - Categorical features to be OneHot encoded - owner,usage,LuxurySportsOrHybrid,drivetrain,accidenthist,colorexterior, colorinterior, bodytype

#### Data Loading and Preparation

In [0]:
from pyspark.sql.functions import  trim,col,when,round,upper
from datetime import datetime
from operator import add
from functools import reduce

In [0]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/car_category.csv,car_category.csv,152727
dbfs:/FileStore/tables/car_ratings.csv,car_ratings.csv,7441
dbfs:/FileStore/tables/car_reliability_rankings.csv,car_reliability_rankings.csv,499
dbfs:/FileStore/tables/car_sales.csv,car_sales.csv,1559
dbfs:/FileStore/tables/cardata_for_DL.csv,cardata_for_DL.csv,687719
dbfs:/FileStore/tables/statewise_economic_indicators.csv,statewise_economic_indicators.csv,950
dbfs:/FileStore/tables/used_car_time_to_turn.csv,used_car_time_to_turn.csv,1942


## Step 1 : Standardize and Augment data

In [0]:
def get_df_from_csv(file_location):

  df = spark.read.format("csv") \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", ",") \
    .load(file_location)

  return df


In [0]:
df_base_car_data = get_df_from_csv("/FileStore/tables/cardata_for_DL.csv")
display(df_base_car_data.head(10))

year,make,model,trim,odometer,state,colorexterior,colorinterior,accidenthist,owner,usage,price
2015,BMW,I3,60 AH,21493,WA,SILVER,GRAY,Y,2,PERSONAL,15991
2018,CHEVROLET,EQUINOX,LS WITH 1LS FWD,37071,CA,BLACK,GRAY,N,1,PERSONAL,14899
2019,SUBARU,IMPREZA,2.0I PREMIUM 5-DOOR CVT,15914,TX,RED,BEIGE,N,1,PERSONAL,19220
2019,DODGE,GRAND CARAVAN,SXT,42070,FL,GRAY,BLACK,N,1,PERSONAL,12993
2020,TOYOTA,COROLLA,LE CVT,18725,TX,WHITE,BLACK,N,1,PERSONAL,13800
2017,HONDA,CIVIC,EX-L COUPE CVT,186240,NJ,BLACK,BLACK,Y,1,PERSONAL,9626
2019,NISSAN,SENTRA,SV CVT,35555,TX,RED,BLACK,N,2,PERSONAL,10899
2019,NISSAN,SENTRA,S CVT,15305,FL,SILVER,BLACK,N,1,FLEET,8991
2020,TOYOTA,COROLLA,LE CVT,10267,CA,WHITE,UNKNOWN,N,1,PERSONAL,13999
2019,NISSAN,SENTRA,SV CVT,18600,AZ,GRAY,BLACK,Y,0,FLEET,9900


In [0]:
def standardizeData(df):
  
  # Trim all strings
  stringColumns = [column[0] for column in df.dtypes if column[1].startswith('string')]  
  for colname in stringColumns:
      df = df.withColumn(colname, trim(col(colname)))

  # Perform data conversions
  df = df.withColumn("age", datetime.now().year - col("year")) \
         .withColumn("odo", round(col("odometer")/ 1000,2)) \
         .drop("odometer") \
         .withColumn("drivetrain", when(col("trim").like("%AWD%"),"AWD") \
                                  .when(col("trim").like("%RWD%"),"AWD") \
                                  .otherwise("FWD"))  \
         .withColumn("colorexterior", when(col("colorexterior") == "WHITE","WHITE") \
                                     .when(col("colorexterior") == "BLACK","BLACK") \
                                     .when(col("colorexterior") == "SILVER","SILVER") \
                                     .when(col("colorexterior") == "GRAY","GRAY") \
                                     .when(col("colorexterior") == "BLUE","BLUE") \
                                     .when(col("colorexterior") == "RED","RED") \
                                     .otherwise("OTHER")) \
         .withColumn("colorinterior", when(col("colorinterior") == "BLACK","BLACK") \
                                     .when(col("colorinterior") == "GRAY","GRAY") \
                                     .when(col("colorinterior") == "BEIGE","BEIGE") \
                                     .otherwise("OTHER")) \
         .withColumn("owner", when(col("owner") == 0,0) \
                             .when(col("owner") == 1,1) \
                             .otherwise(2))  \
         .withColumn("price", round(col("price")/ 1000,2))   
  return df

In [0]:
def augmentData(df_base):   
    
    # register base data as a view
    df_base.createOrReplaceTempView('car_base')
    
    # Get all augment data files
    df_car_category = get_df_from_csv("/FileStore/tables/car_category.csv")
    df_car_ratings = get_df_from_csv("/FileStore/tables/car_ratings.csv")
    df_car_reliability_rankings = get_df_from_csv("/FileStore/tables/car_reliability_rankings.csv")
    df_car_sales = get_df_from_csv("/FileStore/tables/car_sales.csv")
    df_statewise_economic_indicators = get_df_from_csv("/FileStore/tables/statewise_economic_indicators.csv")
    df_used_car_time_to_turn = get_df_from_csv("/FileStore/tables/used_car_time_to_turn.csv")
      
    # register the files as views
    df_car_reliability_rankings = df_car_reliability_rankings.select('Make','ReliabilityRank')
    df_car_reliability_rankings.createOrReplaceTempView('car_rel')

    df_statewise_economic_indicators = df_statewise_economic_indicators.select('State','CostOfLivingRank')
    df_statewise_economic_indicators.createOrReplaceTempView('car_state')

    df_car_sales = df_car_sales.drop('TotalSales')
    df_car_sales.createOrReplaceTempView('car_sales')

    numeric_col_list = [column[0] for column in df_used_car_time_to_turn.dtypes if column[1].startswith('double')]
    df_used_car_time_to_turn = df_used_car_time_to_turn.withColumn('AvgDaysToTurn',round(reduce(add, [col(x) for x in numeric_col_list]) / (len(numeric_col_list) - 1),2)) \
                                                       .withColumn('Make', upper(col('Make'))) \
                                                       .select('Make','AvgDaysToTurn')
    df_used_car_time_to_turn.createOrReplaceTempView('car_ttt')

    df_car_ratings.createOrReplaceTempView('car_rat')

    df_car_category.createOrReplaceTempView('car_cat') 

    # create final augmented DF with joins using sql
    df_car_final = spark.sql('''
                                select car_base.*,
                                       car_cat.Category as bodytype,
                                       car_rel.ReliabilityRank as reliabilityrank,
                                       car_state.CostOfLivingRank as costoflivingrank, 
                                       round(car_sales.PercentSales,2) as percentsales,
                                       car_ttt.AvgDaysToTurn as avgdaystoturn,
                                       car_rat.ReviewScore as reviewscore,
                                       car_rat.MPG as mpg,
                                       car_rat.LuxurySportsOrHybrid as luxurysportshybrid
                                from car_base 
                                INNER JOIN car_cat
                                ON car_base.year = car_cat.Year and car_base.make = car_cat.Make and car_base.model = car_cat.Model
                                INNER JOIN car_rel
                                ON car_base.make = car_rel.Make 
                                INNER JOIN car_state
                                ON car_base.state = car_state.State 
                                INNER JOIN car_sales
                                ON  car_base.make = car_sales.Make
                                INNER JOIN car_ttt
                                ON  car_base.make = car_ttt.Make
                                INNER JOIN car_rat
                                ON  car_base.make || ' ' || car_base.model = car_rat.MakeModel                         
                                '''
                          ).drop("make","model","year","trim","state").distinct()    

    
    
    return df_car_final

In [0]:
df = standardizeData(df_base_car_data)
df = augmentData(df)
display(df.limit(10))

colorexterior,colorinterior,accidenthist,owner,usage,price,age,odo,drivetrain,bodytype,reliabilityrank,costoflivingrank,percentsales,avgdaystoturn,reviewscore,mpg,luxurysportshybrid
WHITE,OTHER,N,1,PERSONAL,14.0,1,10.27,FWD,SEDAN,5,49,12.19,46.83,7.9,32.0,N
BLACK,BLACK,Y,1,FLEET,10.79,2,24.69,FWD,SEDAN,10,27,13.83,77.58,8.2,26.0,N
BLACK,BLACK,Y,1,PERSONAL,9.63,4,186.24,FWD,SEDAN,18,43,8.34,52.75,8.2,34.0,N
RED,BEIGE,N,1,PERSONAL,19.22,2,15.91,FWD,SEDAN,23,14,4.13,25.17,7.7,33.0,N
BLACK,GRAY,N,1,PERSONAL,14.9,3,37.07,FWD,SUV,9,49,11.69,84.75,8.4,29.0,N
GRAY,BLACK,Y,0,FLEET,9.9,2,18.6,FWD,SEDAN,16,29,6.29,82.58,7.2,33.0,N
SILVER,BLACK,N,1,FLEET,8.99,2,15.31,FWD,SEDAN,16,27,6.29,82.58,7.2,33.0,N
SILVER,GRAY,Y,2,PERSONAL,15.99,6,21.49,FWD,HATCHBACK,8,39,1.78,80.17,7.5,113.0,Y
WHITE,BLACK,N,1,PERSONAL,13.8,1,18.73,FWD,SEDAN,5,14,12.19,46.83,7.9,32.0,N
RED,BLACK,N,2,PERSONAL,10.9,2,35.56,FWD,SEDAN,16,14,6.29,82.58,7.2,33.0,N


### Step 2 : Encode Data for ML model

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder,StringIndexer,VectorAssembler


categoricalColumns = ["colorexterior", "colorinterior", "accidenthist", "owner", "usage", "drivetrain", "bodytype", "luxurysportshybrid"]
numericCols = ["age", "odo", "reliabilityrank", "costoflivingrank", "percentsales", "avgdaystoturn","reviewscore","mpg"]

stages = [] # stages in our Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [0]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.

pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)
# Keep relevant columns
selectedcols = ["features"] + df.columns
dataset = dataset.select(selectedcols)
display(dataset.limit(10))

features,colorexterior,colorinterior,accidenthist,owner,usage,price,age,odo,drivetrain,bodytype,reliabilityrank,costoflivingrank,percentsales,avgdaystoturn,reviewscore,mpg,luxurysportshybrid
"Map(vectorType -> sparse, length -> 31, indices -> List(1, 6, 9, 10, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 30.22, 13.0, 11.0, 4.16, 85.67, 8.1, 31.0))",BLACK,BLACK,N,1,PERSONAL,13.48,2,30.22,FWD,SEDAN,13,11,4.16,85.67,8.1,31.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(0, 8, 9, 10, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 28.02, 5.0, 49.0, 12.19, 46.83, 8.7, 35.0))",WHITE,OTHER,N,1,PERSONAL,16.62,2,28.02,FWD,SEDAN,5,49,12.19,46.83,8.7,35.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(3, 6, 9, 10, 12, 13, 15, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 29.6, 16.0, 23.0, 6.29, 82.58, 7.4, 29.5))",GRAY,BLACK,N,1,PERSONAL,17.92,2,29.6,FWD,SUV,16,23,6.29,82.58,7.4,29.5,N
"Map(vectorType -> sparse, length -> 31, indices -> List(0, 8, 9, 10, 12, 13, 15, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 30.39, 26.0, 39.0, 5.53, 80.42, 7.0, 27.5))",WHITE,OTHER,N,1,PERSONAL,22.0,2,30.39,FWD,SUV,26,39,5.53,80.42,7.0,27.5,N
"Map(vectorType -> sparse, length -> 31, indices -> List(0, 6, 9, 10, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 29.5, 13.0, 31.0, 4.16, 85.67, 8.1, 31.0))",WHITE,BLACK,N,1,PERSONAL,12.5,2,29.5,FWD,SEDAN,13,31,4.16,85.67,8.1,31.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(0, 6, 9, 10, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 28.66, 16.0, 29.0, 6.29, 82.58, 7.2, 33.0))",WHITE,BLACK,N,1,PERSONAL,12.45,2,28.66,FWD,SEDAN,16,29,6.29,82.58,7.2,33.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(1, 6, 9, 11, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 7.0, 63.0, 9.0, 26.0, 11.69, 84.75, 8.5, 33.0))",BLACK,BLACK,N,2,PERSONAL,8.49,7,63.0,FWD,SEDAN,9,26,11.69,84.75,8.5,33.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(3, 6, 9, 10, 12, 15, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 6.54, 9.0, 17.0, 11.69, 84.75, 8.4, 29.0))",GRAY,BLACK,N,1,PERSONAL,22.0,1,6.54,AWD,SUV,9,17,11.69,84.75,8.4,29.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(3, 6, 9, 10, 12, 13, 14, 22, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 8.0, 100.26, 9.0, 15.0, 11.69, 84.75, 8.5, 33.0))",GRAY,BLACK,N,1,PERSONAL,6.0,8,100.26,FWD,SEDAN,9,15,11.69,84.75,8.5,33.0,N
"Map(vectorType -> sparse, length -> 31, indices -> List(2, 7, 9, 10, 12, 14, 23, 24, 25, 26, 27, 28, 29, 30), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 4.0, 24.9, 24.0, 10.0, 0.65, 74.0, 7.4, 26.5))",SILVER,GRAY,N,1,PERSONAL,22.99,4,24.9,AWD,SEDAN,24,10,0.65,74.0,7.4,26.5,Y


In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.75, 0.25], seed=100)
print(trainingData.count())
print(testData.count())

In [0]:
trainingData.show(5)

#### Step 3 - Explore models - Linear Regression, Gradient Boost Tree Regression, Random Forest Regression

In [0]:
from pyspark.ml.regression import LinearRegression, LinearRegressionSummary,RandomForestRegressor,GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [0]:
def trainAndTestModel(modeltype,trainingData, testData,featuresCol,labelCol):
  
  print(f"\nTesting Summary for {modeltype}:")
  
  if modeltype == "LR":
    model = LinearRegression(featuresCol = featuresCol, labelCol = labelCol, maxIter=10, regParam=0.3, elasticNetParam=0.8)
  elif modeltype == "GB":
    model = GBTRegressor(featuresCol = featuresCol, labelCol = labelCol, maxIter=10)
  elif modeltype == "RF":
    model = RandomForestRegressor(featuresCol = featuresCol, labelCol = labelCol)
    
    
  model_fit = model.fit(trainingData)
#  trainingSummary = model_fit.summary
#   print(f"Training Summary for {modeltype}:")
#   print(f"MAE : {trainingSummary.meanAbsoluteError:.2f}")
#   print(f"RMSE: {trainingSummary.rootMeanSquaredError:.2f}")
#   print(f"R2 on train data: {trainingSummary.r2:.2f}")

  predictions = model_fit.transform(testData)
  for metric in ["mae","rmse","r2"]:
    evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=labelCol,metricName=metric)
    print(f"{metric} on test data = {evaluator.evaluate(predictions):.2f}")

In [0]:
for modeltype in ["LR","GB","RF"]:
  trainAndTestModel(modeltype,trainingData, testData,"features","price")

#### From the above results, we see that GBT regressor shows best results.
#### Tune a GBT regressor with a few hyperparameters

In [0]:
## Tune parameters for a GBT regressor

gbt = GBTRegressor(featuresCol = "features", labelCol = "price")

# Create ParamGrid for Cross Validation
gbParamGrid = ParamGridBuilder()\
            .addGrid(gbt.maxDepth, [3,5])\
            .build()

# set up an evaluator
evaluator = RegressionEvaluator(predictionCol=gbt.getPredictionCol(), 
                                labelCol=gbt.getLabelCol(),
                                metricName="rmse")

# set up CV
gbtCv = CrossValidator(estimator=gbt, 
                       evaluator=evaluator, 
                       estimatorParamMaps=gbParamGrid,
                       numFolds=2)

# Run cross validations
gbtCvModel = gbtCv.fit(trainingData)


# Get best params
gbtCvModelParams = gbtCvModel.bestModel.extractParamMap()
gbtCvModelParamsDict = {item[0].name : item[1] for item in gbtCvModelParams.items()}
gbtCvModelParamsDict

In [0]:
gbtCvPredictions = gbtCvModel.transform(testData)

display(gbtCvPredictions) 