### Importing necessary libreries of pyspark

In [None]:
import sys
import pyspark.sql.functions as f

from functools import reduce
from pyspark.sql import Window
from pyspark.sql.functions import last, first
from pyspark.sql.functions import when, lit
from pyspark.sql.functions import isnan, when, count, col, isnull, mean
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer

### Import dataset from Data Lake Storage Gen 2 container to dataframe

In [2]:
# Provided container path of dataset file
path = "/HdiSamples/HdiSamples/all_anonymized_2015_11_2017_03.csv"

In [3]:
# Reading data using Apache Spark and storing it into Pyspark dataframe
df = spark.read.csv(path, header = True)

In [4]:
# Show top 5 records
df.show(5)

+-----+-------+-------+----------------+-------------------+------------+---------+----------+--------+------------+----------+----------+---------+--------------------+--------------------+---------+
|maker|  model|mileage|manufacture_year|engine_displacement|engine_power|body_type|color_slug|stk_year|transmission|door_count|seat_count|fuel_type|        date_created|      date_last_seen|price_eur|
+-----+-------+-------+----------------+-------------------+------------+---------+----------+--------+------------+----------+----------+---------+--------------------+--------------------+---------+
| ford| galaxy| 151000|            2011|               2000|         103|     null|      null|    None|         man|         5|         7|   diesel|2015-11-14 18:10:...|2016-01-27 20:40:...| 10584.75|
|skoda|octavia| 143476|            2012|               2000|          81|     null|      null|    None|         man|         5|         5|   diesel|2015-11-14 18:10:...|2016-01-27 20:40:...|  8882

In [5]:
df.columns

['maker', 'model', 'mileage', 'manufacture_year', 'engine_displacement', 'engine_power', 'body_type', 'color_slug', 'stk_year', 'transmission', 'door_count', 'seat_count', 'fuel_type', 'date_created', 'date_last_seen', 'price_eur']

In [6]:
df.describe()

DataFrame[summary: string, maker: string, model: string, mileage: string, manufacture_year: string, engine_displacement: string, engine_power: string, body_type: string, color_slug: string, stk_year: string, transmission: string, door_count: string, seat_count: string, fuel_type: string, date_created: string, date_last_seen: string, price_eur: string]

In [88]:
df.summary().show()

+-------+----------+-----------------+------------------+------------------+-------------------+-----------------+---------+----------+------------------+------------+------------------+------------------+---------+--------------------+--------------------+--------------------+
|summary|     maker|            model|           mileage|  manufacture_year|engine_displacement|     engine_power|body_type|color_slug|          stk_year|transmission|        door_count|        seat_count|fuel_type|        date_created|      date_last_seen|           price_eur|
+-------+----------+-----------------+------------------+------------------+-------------------+-----------------+---------+----------+------------------+------------+------------------+------------------+---------+--------------------+--------------------+--------------------+
|  count|   3033997|          2419551|           3190328|           3182334|            2809498|          2998035|  2429998|    209501|           1844756|     2811

### Data Cleaning and Pre-processing

In [331]:
# Printing null values in dataframe
print({col:df.filter(df[col].isNull()).count() for col in df.columns})

{'mileage': 362584, 'engine_power': 554877, 'engine_displacement': 743414, 'transmission': 741630, 'fuel_type': 1847606, 'color_slug': 3343411, 'manufacture_year': 370578, 'door_count': 614373, 'seat_count': 749489, 'body_type': 1122914, 'date_last_seen': 0, 'stk_year': 1708156, 'date_created': 0, 'model': 1133361, 'maker': 518915, 'price_eur': 0}

In [7]:
# Dropped columns not required for analysis or predictions
df_updated = df[['maker', 'model', 'mileage', 'manufacture_year', 'transmission',
       'door_count', 'seat_count', 'fuel_type', 'date_created',
       'date_last_seen', 'price_eur']]

In [8]:
df_updated.columns

['maker', 'model', 'mileage', 'manufacture_year', 'transmission', 'door_count', 'seat_count', 'fuel_type', 'date_created', 'date_last_seen', 'price_eur']

In [9]:
# Dropping null values from columns 'maker' and 'model'
df_updated = df_updated.dropna(subset=['maker', 'model'])

In [10]:
# Dropping records where manufacture_year is less than 1980
df_updated = df_updated.where(df_updated.manufacture_year >= 1980)

In [11]:
# Converting mileage column from string to integer
df_updated = df_updated.withColumn("mileage",df_updated.mileage.cast('int'))

In [12]:
# Dropping records with price less than 3000 euros and greater than 200000 euros
df_updated = df_updated.where(df_updated.price_eur <= 200000).where(df_updated.price_eur >= 3000)

In [13]:
# Dropping records with mileage less than 500 and greater than 500000
df_updated = df_updated.where(df_updated.mileage <= 500000).where(df_updated.mileage >= 500)

In [14]:
# Windowing : grouping data, sorted by price between 0 to the last records
window = Window.orderBy('price_eur').rowsBetween(-sys.maxsize, 0)

In [23]:
# Forward filling missing records of fuel type using windowing approach
filled_column = last(df_updated['fuel_type'], ignorenulls=True).over(window)

df_updated = df_updated.withColumn('fuel_type', filled_column)

In [26]:
print({col:df_updated.filter(df_updated[col].isNull()).count() for col in ['fuel_type']})

{'fuel_type': 0}

In [25]:
df_updated = df_updated.dropna(subset=['fuel_type'])

In [17]:
# Forward filling missing records of door count using windowing approach
filled_column = last(df_updated['door_count'], ignorenulls=True).over(window)

df_updated = df_updated.withColumn('door_count', filled_column)

In [27]:
print({col:df_updated.filter(df_updated[col].isNull()).count() for col in ['door_count']})

{'door_count': 0}

In [19]:
# Forward filling missing records of seat count using windowing approach
filled_column = last(df_updated['seat_count'], ignorenulls=True).over(window)

df_updated = df_updated.withColumn('seat_count', filled_column)

In [28]:
print({col:df_updated.filter(df_updated[col].isNull()).count() for col in ['seat_count']})

{'seat_count': 0}

In [21]:
# Forward filling missing records of transmission count using windowing approach
filled_column = last(df_updated['transmission'], ignorenulls=True).over(window)

df_updated = df_updated.withColumn('transmission', filled_column)

In [29]:
print({col:df_updated.filter(df_updated[col].isNull()).count() for col in ['transmission']})

{'transmission': 0}

In [30]:
# Updating door count 0 with max value 4
df_updated = df_updated.withColumn('door_count', when(df_updated.door_count == 0, lit('4')).otherwise(df_updated.door_count))

In [31]:
# Updating door count none value with 4 
df_updated = df_updated.withColumn('door_count', when(df_updated.door_count == None, lit('4')).otherwise(df_updated.door_count))

In [32]:
# Updating seat count 0, none with max value 4
df_updated = df_updated.withColumn('seat_count', when(df_updated.door_count == 0, lit('5')).otherwise(df_updated.door_count))
df_updated = df_updated.withColumn('seat_count', when(df_updated.door_count == None, lit('5')).otherwise(df_updated.door_count))

In [34]:
# Filling null mileage to the average mileage value
df_mean = df_updated.fillna(value=0, subset=['mileage']).select(mean(col('mileage')).alias('avg')).collect()
avg = df_mean[0]['avg']

In [35]:
df_updated = df_updated.fillna(value=avg, subset=['mileage'])

In [36]:
# df_updated = df_updated.dropna(subset=['mileage'])

+-----+-----+-------+----------------+------------+----------+----------+---------+------------+--------------+---------+
|maker|model|mileage|manufacture_year|transmission|door_count|seat_count|fuel_type|date_created|date_last_seen|price_eur|
+-----+-----+-------+----------------+------------+----------+----------+---------+------------+--------------+---------+
|    0|    0|      0|               0|           0|         0|         0|        0|           0|             0|        0|
+-----+-----+-------+----------------+------------+----------+----------+---------+------------+--------------+---------+

In [37]:
# Printing null values in all the columns after cleaning the data
print({col:df_updated.filter(df_updated[col].isNull()).count() for col in df_updated.columns})

{'mileage': 0, 'transmission': 0, 'fuel_type': 0, 'manufacture_year': 0, 'door_count': 0, 'seat_count': 0, 'date_last_seen': 0, 'date_created': 0, 'model': 0, 'maker': 0, 'price_eur': 0}

In [38]:
# Printing the summary of cleaned table
df_updated.summary().show()

+-------+----------+------------------+-----------------+------------------+------------+------------------+------------------+---------+--------------------+--------------------+------------------+
|summary|     maker|             model|          mileage|  manufacture_year|transmission|        door_count|        seat_count|fuel_type|        date_created|      date_last_seen|         price_eur|
+-------+----------+------------------+-----------------+------------------+------------+------------------+------------------+---------+--------------------+--------------------+------------------+
|  count|   1139628|           1139628|          1139628|           1139628|     1139628|           1139628|           1139628|  1139628|             1139628|             1139628|           1139628|
|   mean|      null| 621.9709091880485|83797.17307314316|2010.3906327327866|        null| 4.041675602258918| 4.041675602258918|     null|                null|                null|14338.538034121024|
| std

In [158]:
# Storing dataframe as a table back to the Azure data lake storage container for visualization in power bi
# df_updated.write.saveAsTable("cleaned_car_data_final")

### Feature Selection for Machine Learning Model

In [39]:
# Converting price_eur to float datatype
df_updated = df_updated.withColumn("price_eur",df_updated.price_eur.cast('float'))

In [40]:
# Converting manufacture_year to int datatype
df_updated = df_updated.withColumn("manufacture_year",df_updated.manufacture_year.cast('int'))

In [41]:
# Converting door_count to int datatype
df_updated = df_updated.withColumn("door_count",df_updated.door_count.cast('int'))

In [42]:
# Converting seat_count to int datatype
df_updated = df_updated.withColumn("seat_count",df_updated.seat_count.cast('int'))

In [43]:
df_updated

DataFrame[maker: string, model: string, mileage: int, manufacture_year: int, transmission: string, door_count: int, seat_count: int, fuel_type: string, date_created: string, date_last_seen: string, price_eur: float]

In [44]:
# Selecting feature relevent for prediction of price
pred_df = df_updated[['maker', 'model', 'mileage', 'manufacture_year', 'transmission',
       'door_count', 'seat_count', 'fuel_type', 'price_eur']]

### Label Encoding

In [46]:
# Label Encoding of transmission 
indexer = StringIndexer(inputCol="transmission", outputCol="transmission_encoded") 
pred_df = indexer.fit(pred_df).transform(pred_df) 

In [47]:
# Label Encoding of maker 
indexer = StringIndexer(inputCol="maker", outputCol="maker_encoded") 
pred_df = indexer.fit(pred_df).transform(pred_df) 
pred_df

DataFrame[maker: string, model: string, mileage: int, manufacture_year: int, transmission: string, door_count: int, seat_count: int, fuel_type: string, price_eur: float, transmission_encoded: double, maker_encoded: double]

In [48]:
# Label Encoding of model 
indexer = StringIndexer(inputCol="model", outputCol="model_encoded") 
pred_df = indexer.fit(pred_df).transform(pred_df) 
pred_df

DataFrame[maker: string, model: string, mileage: int, manufacture_year: int, transmission: string, door_count: int, seat_count: int, fuel_type: string, price_eur: float, transmission_encoded: double, maker_encoded: double, model_encoded: double]

In [49]:
# Label Encoding of fuel_type 
indexer = StringIndexer(inputCol="fuel_type", outputCol="fuel_type_encoded") 
pred_df = indexer.fit(pred_df).transform(pred_df) 
pred_df

DataFrame[maker: string, model: string, mileage: int, manufacture_year: int, transmission: string, door_count: int, seat_count: int, fuel_type: string, price_eur: float, transmission_encoded: double, maker_encoded: double, model_encoded: double, fuel_type_encoded: double]

In [50]:
pred_df = pred_df[['maker_encoded', 'model_encoded', 'mileage', 'manufacture_year', 
                   'transmission_encoded', 'door_count', 'seat_count', 'fuel_type_encoded', 'price_eur']]

In [51]:
# Printing schema for prediction df
pred_df.printSchema()

root
 |-- maker_encoded: double (nullable = false)
 |-- model_encoded: double (nullable = false)
 |-- mileage: integer (nullable = true)
 |-- manufacture_year: integer (nullable = true)
 |-- transmission_encoded: double (nullable = false)
 |-- door_count: integer (nullable = true)
 |-- seat_count: integer (nullable = true)
 |-- fuel_type_encoded: double (nullable = false)
 |-- price_eur: float (nullable = true)

### Machine Learning Model Implementation

In [76]:
# Assembling all the feature columns into one vector into features
assembler = VectorAssembler(
    inputCols=['maker_encoded', 'model_encoded', 'mileage', 'manufacture_year', 
                   'transmission_encoded', 'door_count', 'seat_count', 'fuel_type_encoded'],
    outputCol="features",
    handleInvalid="skip")

In [77]:
# Storing assembled records of pred_df to output
output = assembler.transform(pred_df)

In [78]:
output.show(3)

+-------------+-------------+-------+----------------+--------------------+----------+----------+-----------------+---------+--------------------+
|maker_encoded|model_encoded|mileage|manufacture_year|transmission_encoded|door_count|seat_count|fuel_type_encoded|price_eur|            features|
+-------------+-------------+-------+----------------+--------------------+----------+----------+-----------------+---------+--------------------+
|         12.0|        137.0|   9944|            2014|                 0.0|         4|         4|              1.0|  10000.0|[12.0,137.0,9944....|
|          3.0|          4.0| 115000|            2008|                 0.0|         5|         5|              0.0|  10000.0|[3.0,4.0,115000.0...|
|         18.0|        107.0| 115000|            2005|                 0.0|         5|         5|              0.0|  10000.0|[18.0,107.0,11500...|
+-------------+-------------+-------+----------------+--------------------+----------+----------+-----------------+---

In [79]:
output.select("features").show(3)

+--------------------+
|            features|
+--------------------+
|[12.0,137.0,9944....|
|[3.0,4.0,115000.0...|
|[18.0,107.0,11500...|
+--------------------+
only showing top 3 rows

### 1. Linear Regression

In [80]:
# Selecting features and price_eur from output to final_data
final_data = output.select("features",'price_eur')

In [81]:
# Split final_data to 80% and 20%
train, test = final_data.randomSplit([0.8, 0.2], seed=12345)

In [82]:
train.show(1)

+--------------------+---------+
|            features|price_eur|
+--------------------+---------+
|(8,[2,3,5,6],[500...|  15990.0|
+--------------------+---------+
only showing top 1 row

In [84]:
# Creating linear regression object
lr = LinearRegression(labelCol='price_eur')

In [85]:
# Train linear regression model using train data
lrModel = lr.fit(train)

In [87]:
# Evaluate the model performance using test data
test_results = lrModel.evaluate(test)

In [89]:
print("Linear Regression Accuracy Measures")

print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("r2: %f" % test_results.r2)

RMSE: 10525.0850039
r2: 0.326811

In [90]:
# Prediction on entire dataset
predictions_lr = lrModel.transform(final_data)

In [91]:
predictions_lr.select('price_eur', 'prediction').show(3)

+---------+------------------+
|price_eur|        prediction|
+---------+------------------+
|  10000.0|17860.684870653553|
|  10000.0|  7544.08580135298|
|  10000.0| 7061.939970151987|
+---------+------------------+
only showing top 3 rows

## 2. Random Forest

In [97]:
# converting final data price_eur to label
final_data = final_data.withColumn("label", final_data.price_eur)

In [99]:
# Split the data into 80% and 20%
(rf_train_data, rf_test_data) = final_data.randomSplit([0.8, 0.2], seed = 111)

In [109]:
# Creating Random forest regressor
rf = RandomForestRegressor(labelCol="label", featuresCol="features", maxDepth = 10, maxBins = 1000)

In [110]:
# Traing the model using train data
rf_model = rf.fit(rf_train_data)

In [118]:
# Transforming test data for evaluating performance mattrics
rf_predictions = rf_model.transform(rf_test_data)

In [119]:
# Creating object of Regression Evaluator of RMSE(Root Mean Square Error) for calculating RMSE
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)

In [128]:
# Creating object of Regression Evaluator of R2 (Accurace)
evaluator_r2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
r2 = evaluator_r2.evaluate(rf_predictions)

In [129]:
print("Randon Forest Accuracy Measures")
print("RMSE: {}".format(rmse))
print("r2: %f" % r2)

Randon Forest Accuracy Measures
RMSE: 4391.79845519
r2: 0.881959

In [150]:
# Prediction on entire dataset
predictions_rf = rf_model.transform(final_data)

In [151]:
predictions_rf.select('price_eur', 'prediction').show(3)

+---------+-----------------+
|price_eur|       prediction|
+---------+-----------------+
|  10000.0| 11803.0385993194|
|  10000.0|6180.610780433815|
|  10000.0|6335.474590564414|
+---------+-----------------+
only showing top 3 rows

### 3. Gradient Boosting Model

In [137]:
# Creating Random forest regressor
gb = GBTRegressor(labelCol="label", featuresCol="features", maxDepth = 10, maxBins = 1000)

In [138]:
# Creating gradient boosting model
gb_model = gb.fit(rf_train_data)

In [141]:
# Transforming test data for evaluating performance mattrics
gb_predictions = gb_model.transform(rf_test_data)

In [144]:
# Using evaluator for r2 to calculate RMSE score of prediction
gb_rmse = evaluator.evaluate(gb_predictions)

In [142]:
# Using evaluator for r2 to calculate R2 score of prediction
gb_r2 = evaluator_r2.evaluate(gb_predictions)

In [145]:
print("Gradient Boosting Accuracy Measures")
print("RMSE: {}".format(gb_rmse))
print("r2: %f" % gb_r2)

Gradient Boosting Accuracy Measures
RMSE: 4049.56469666
r2: 0.867984

In [154]:
# Prediction on entire dataset
predictions_gb = gb_model.transform(final_data)

In [155]:
predictions_gb.select('price_eur', 'prediction').show(3)

+---------+------------------+
|price_eur|        prediction|
+---------+------------------+
|  10000.0|10599.217424851886|
|  10000.0| 5929.010313645407|
|  10000.0| 5922.548893972206|
+---------+------------------+
only showing top 3 rows