#### Installing required libraries and declaring them

In [None]:
!pip -q install pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import Evaluator, RegressionEvaluator
import os

#### Fetching Data from Kaggle

In [None]:
%cd '/content/drive/MyDrive/bigData'

#procedure to download kaggle datasets to google drive
os.environ['KAGGLE_USERNAME'] = "lalit871" # username from the json file
os.environ['KAGGLE_KEY'] = "ccd5bff8c4fc8368e3269b44bfaa2c86" # key from the json file

!kaggle datasets download -d nehalbirla/vehicle-dataset-from-cardekho

/content/drive/MyDrive/bigData
Downloading vehicle-dataset-from-cardekho.zip to /content/drive/MyDrive/bigData
  0% 0.00/227k [00:00<?, ?B/s]
100% 227k/227k [00:00<00:00, 15.2MB/s]


In [None]:
!unzip -q vehicle-dataset-from-cardekho.zip

In [None]:
dataPath = '/content/drive/MyDrive/bigData/Cardetailsv3.csv'

### Tasks
#### Read the dataset into a PySpark Dataframe. 

In [None]:
sc = SparkSession.builder.master('local').appName('RegressionCarDekho').getOrCreate() #starting a spark session

+--------------------+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+--------------------+-----+
|                name|year|selling_price|km_driven|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|              torque|seats|
+--------------------+----+-------------+---------+------+-----------+------------+------------+----------+-------+----------+--------------------+-----+
|Maruti Swift Dzir...|2014|       450000|   145500|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|      190Nm@ 2000rpm|    5|
|Skoda Rapid 1.5 T...|2014|       370000|   120000|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp| 250Nm@ 1500-2500rpm|    5|
|Honda City 2017-2...|2006|       158000|   140000|Petrol| Individual|      Manual| Third Owner| 17.7 kmpl|1497 CC|    78 bhp|12.7@ 2,700(kgm@ ...|    5|
|Hyundai i20 Sport...|2010|       225000|   127000|Diesel| Individual|      

In [None]:
df.printSchema() # schema of the dataset

root
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- torque: string (nullable = true)
 |-- seats: integer (nullable = true)



In [None]:
df.describe().show() #summary of the dataset to check for extreme values along with mean and standard deviation

# Here we can also see that 8128 is the total number of rows and columns having values less than that number have missing values which we will impute in the below cells.

+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+----------------+
|summary|                name|              year|    selling_price|         km_driven|  fuel|     seller_type|transmission|      owner| mileage| engine|max_power|              torque|           seats|
+-------+--------------------+------------------+-----------------+------------------+------+----------------+------------+-----------+--------+-------+---------+--------------------+----------------+
|  count|                8128|              8128|             8128|              8128|  8128|            8128|        8128|       8128|    7907|   7907|     7913|                7906|            7907|
|   mean|                null|2013.8040108267717|638271.8077017716| 69819.51082677166|  null|            null|        null|       null|    null|   null|      0.0|                null|5.41671936259

#### Extracting data from the columns CAR BRAND, MILEAGE, ENGINE, MAX_POWER, NAME, and TORQUE using regex.

In [None]:
# \d+ -> for 1 or more digit followed by \.? 0 or 1 dot and \d{1,2} 1 or 2 decimal values
# for car brand extract first occurence of words which is brand name

new_df = df.withColumn('mileage',regexp_extract('mileage', r'(\d+\.?\d{1,2})',1))

new_df = new_df.withColumn('engine',regexp_extract('engine', r'(\d+)',1))

new_df = new_df.withColumn('max_power',regexp_extract('max_power', r'(\d+\.?\d{1,2})',1))

new_df = new_df.withColumn('name',regexp_extract('name', r'(\w+)',1))

new_df = new_df.withColumn('torque',regexp_extract('torque', r'(\d+\.?\d{1,2})',1))

new_df = new_df.withColumn('num_years', 2021 - new_df['year']) # creating num_years column
new_df = new_df.drop('year')
new_df.show(5)

+-------+-------------+---------+------+-----------+------------+------------+-------+------+---------+------+-----+---------+
|   name|selling_price|km_driven|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|torque|seats|num_years|
+-------+-------------+---------+------+-----------+------------+------------+-------+------+---------+------+-----+---------+
| Maruti|       450000|   145500|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|   190|    5|        7|
|  Skoda|       370000|   120000|Diesel| Individual|      Manual|Second Owner|  21.14|  1498|   103.52|   250|    5|        7|
|  Honda|       158000|   140000|Petrol| Individual|      Manual| Third Owner|   17.7|  1497|       78|  12.7|    5|       15|
|Hyundai|       225000|   127000|Diesel| Individual|      Manual| First Owner|   23.0|  1396|       90|  22.4|    5|       11|
| Maruti|       130000|   120000|Petrol| Individual|      Manual| First Owner|   16.1|  1298|     88.2|  11.5| 

In [None]:
# changing the datatype of the extracted columns to integer and float
new_df_integer = new_df.withColumn('mileage', new_df['mileage'].cast(FloatType()))
new_df_integer = new_df_integer.withColumn('engine',new_df_integer['engine'].cast(IntegerType()))
new_df_integer = new_df_integer.withColumn('max_power', new_df_integer['max_power'].cast(FloatType()))
new_df_integer = new_df_integer.withColumn('torque', new_df_integer['torque'].cast(FloatType()))

#### Imputation of the missing values by calculating average for each brand and then using coalesce to substitute values for each of the imputed columns

In [None]:
# average for each brand name corresponding to the column to be imputed
avgMileageByBrand = new_df_integer.groupby('name').agg(round(mean('mileage') , 2).alias('avgMileage'))
avgEngineByBrand = new_df_integer.groupby('name').agg(round(mean('engine')).alias('avgEngine'))
avgMaxPowerByBrand = new_df_integer.groupby('name').agg(round(mean('max_power') , 2).alias('avgMaxPower'))
avgSeatsByBrand = new_df_integer.groupby('name').agg(round(mean('seats')).alias('avgSeats'))
avgTorqueByBrand = new_df_integer.groupby('name').agg(round(mean('torque')).alias('avgTorque'))

In [None]:
# joining the above created average dataframes to the main dataframe 
new_df_integer = new_df_integer.join(avgMileageByBrand, on='name')
new_df_integer = new_df_integer.join(avgEngineByBrand, on='name')
new_df_integer = new_df_integer.join(avgMaxPowerByBrand, on='name')
new_df_integer = new_df_integer.join(avgSeatsByBrand, on='name')
new_df_integer = new_df_integer.join(avgTorqueByBrand, on='name')

In [None]:
# using coalesce to replace missing values using the joined average column
new_df_integer = new_df_integer.withColumn('imputed_mileage', coalesce('mileage', 'avgMileage'))
new_df_integer = new_df_integer.withColumn('imputed_engine', coalesce('engine', 'avgEngine'))
new_df_integer = new_df_integer.withColumn('imputed_max_power', coalesce('max_power', 'avgMaxPower'))
new_df_integer = new_df_integer.withColumn('imputed_seats', coalesce('seats', 'avgSeats'))
new_df_integer = new_df_integer.withColumn('imputed_torque', coalesce('torque', 'avgTorque'))

new_df_integer = new_df_integer.drop('avgMileage','avgEngine','avgMaxPower','avgSeats','avgTorque','mileage','engine','max_power','torque','seats')

In [None]:
#rounding few columns
new_df_integer = new_df_integer.withColumn('imputed_mileage',round('imputed_mileage', 2))\
                                .withColumn('imputed_max_power', round('imputed_max_power',2))\
                                .withColumn('imputed_torque', round('imputed_torque',2))
new_df_integer.show(5)

+-------+-------------+---------+------+-----------+------------+------------+---------+---------------+--------------+-----------------+-------------+--------------+
|   name|selling_price|km_driven|  fuel|seller_type|transmission|       owner|num_years|imputed_mileage|imputed_engine|imputed_max_power|imputed_seats|imputed_torque|
+-------+-------------+---------+------+-----------+------------+------------+---------+---------------+--------------+-----------------+-------------+--------------+
| Maruti|       450000|   145500|Diesel| Individual|      Manual| First Owner|        7|           23.4|        1248.0|             74.0|          5.0|         190.0|
|  Skoda|       370000|   120000|Diesel| Individual|      Manual|Second Owner|        7|          21.14|        1498.0|           103.52|          5.0|         250.0|
|  Honda|       158000|   140000|Petrol| Individual|      Manual| Third Owner|       15|           17.7|        1497.0|             78.0|          5.0|          12.7

In [None]:
# after imputation only one row of data has null value which is actually a car brand with only 1 entry with missing columns from mileage to torque because of which average for that brand came out to be null.
new_df_integer.select([count(when(col(c).isNull(), c)).alias(c) for c in new_df_integer.columns]).show()

+----+-------------+---------+----+-----------+------------+-----+---------+---------------+--------------+-----------------+-------------+--------------+
|name|selling_price|km_driven|fuel|seller_type|transmission|owner|num_years|imputed_mileage|imputed_engine|imputed_max_power|imputed_seats|imputed_torque|
+----+-------------+---------+----+-----------+------------+-----+---------+---------------+--------------+-----------------+-------------+--------------+
|   0|            0|        0|   0|          0|           0|    0|        0|              1|             1|                1|            1|             1|
+----+-------------+---------+----+-----------+------------+-----+---------+---------------+--------------+-----------------+-------------+--------------+



#### Outlier Removal for better results

In [None]:
# creating a function to remove outliers
def filter_outliers(df, colmn):
    q1,q3 = df.approxQuantile(colmn,[0.25,0.75],0.001) # declaring percentiles for q1 and q3
    lower_bound = q1 - (q3-q1)*3 # actual formula is q1-(q3-q1)*1.5 but it would result in loss of huge number of rows
    upper_bound = q3 + (q3-q1)*6 # same as above
    df = df.filter((df[colmn]>=lower_bound) & (df[colmn]<=upper_bound)) # filtering based on lower bound and upper bound values
    return df

In [None]:
#before removing outliers
new_df_integer.select(['imputed_mileage','imputed_engine','imputed_max_power','imputed_torque','imputed_seats']).describe().show()

+-------+-----------------+------------------+-----------------+------------------+------------------+
|summary|  imputed_mileage|    imputed_engine|imputed_max_power|    imputed_torque|     imputed_seats|
+-------+-----------------+------------------+-----------------+------------------+------------------+
|  count|             8127|              8127|             8127|              8127|              8127|
|   mean|19.41686723268112|1458.6098191214471|91.48425987449174|168.01285591239048| 5.413313645871785|
| stddev|4.001466486649435| 502.0660817536921|35.39817510971568|  96.2852009276265|0.9514862309505531|
|    min|              0.0|             624.0|             32.8|               4.8|               2.0|
|    max|             42.0|            3604.0|            400.0|             789.0|              14.0|
+-------+-----------------+------------------+-----------------+------------------+------------------+



In [None]:
#removing outliers
df_without_outlier = filter_outliers(new_df_integer, 'imputed_mileage')
df_without_outlier = filter_outliers(df_without_outlier, 'imputed_seats')
df_without_outlier = filter_outliers(df_without_outlier, 'imputed_engine')
df_without_outlier = filter_outliers(df_without_outlier, 'imputed_max_power')

In [None]:
#after removing outliers
df_without_outlier.select(['imputed_mileage','imputed_engine','imputed_max_power','imputed_torque','imputed_seats']).describe().show()

+-------+------------------+------------------+-----------------+------------------+-------------+
|summary|   imputed_mileage|    imputed_engine|imputed_max_power|    imputed_torque|imputed_seats|
+-------+------------------+------------------+-----------------+------------------+-------------+
|  count|              6387|              6387|             6387|              6387|         6387|
|   mean|20.449699389384424|1307.0831376232973|87.67088304368134|152.76508532957592|          5.0|
| stddev|3.3733729377218094|320.79340327931146|31.79191223849369| 84.65307678785562|          0.0|
|    min|               9.0|             793.0|             32.8|               5.7|          5.0|
|    max|              33.0|            2997.0|            235.0|             789.0|          5.0|
+-------+------------------+------------------+-----------------+------------------+-------------+



In [None]:
# converting categorical data to numerical
stringIndexer  = StringIndexer(inputCols=['name','fuel','seller_type','transmission','owner'], outputCols=['catName','catFuel','catSellerType','catTransmission','catOwner']) 
indexedDataset = stringIndexer.fit(df_without_outlier).transform(df_without_outlier)
indexedDataset.show(5)

+-------+-------------+---------+------+-----------+------------+------------+---------+---------------+--------------+-----------------+-------------+--------------+--------+-------------+-------+---------------+-------+
|   name|selling_price|km_driven|  fuel|seller_type|transmission|       owner|num_years|imputed_mileage|imputed_engine|imputed_max_power|imputed_seats|imputed_torque|catOwner|catSellerType|catName|catTransmission|catFuel|
+-------+-------------+---------+------+-----------+------------+------------+---------+---------------+--------------+-----------------+-------------+--------------+--------+-------------+-------+---------------+-------+
| Maruti|       450000|   145500|Diesel| Individual|      Manual| First Owner|        7|           23.4|        1248.0|             74.0|          5.0|         190.0|     0.0|          0.0|    0.0|            0.0|    1.0|
|  Skoda|       370000|   120000|Diesel| Individual|      Manual|Second Owner|        7|          21.14|        

In [None]:
#columns to be used in vector assembler
dfCols = ['catName','num_years','km_driven','catFuel','catSellerType','catTransmission','catOwner','imputed_mileage','imputed_engine','imputed_max_power','imputed_torque','imputed_seats']

In [None]:
# vector assembler
assembler = VectorAssembler(inputCols = dfCols, outputCol = 'features')
output = assembler.transform(indexedDataset)
feature_dataframe = output.select("features",'selling_price')
feature_dataframe.show(5)

+--------------------+-------------+
|            features|selling_price|
+--------------------+-------------+
|[0.0,7.0,145500.0...|       450000|
|[10.0,7.0,120000....|       370000|
|[3.0,15.0,140000....|       158000|
|[1.0,11.0,127000....|       225000|
|[0.0,14.0,120000....|       130000|
+--------------------+-------------+
only showing top 5 rows



In [None]:
#Standadization of the features column
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)

scalerModel = scaler.fit(feature_dataframe)
scaledData = scalerModel.transform(feature_dataframe)
scaledData.show(5)

+--------------------+-------------+--------------------+
|            features|selling_price|      scaledFeatures|
+--------------------+-------------+--------------------+
|[0.0,7.0,145500.0...|       450000|[-0.7016970859979...|
|[10.0,7.0,120000....|       370000|[1.54681750775526...|
|[3.0,15.0,140000....|       158000|[-0.0271427078719...|
|[1.0,11.0,127000....|       225000|[-0.4768456266226...|
|[0.0,14.0,120000....|       130000|[-0.7016970859979...|
+--------------------+-------------+--------------------+
only showing top 5 rows



In [None]:
#splitting the standardized data into 80:20 ratio for training and testing
train_data,test_data = scaledData.randomSplit([0.80,0.20])

# linear regression model
regressor = LinearRegression(featuresCol = 'scaledFeatures', labelCol = 'selling_price')

# learn model 
regressor = regressor.fit(train_data)

#predict and evaluate on testdata
pred = regressor.evaluate(test_data)

pred.predictions.show(10)

+--------------------+-------------+--------------------+------------------+
|            features|selling_price|      scaledFeatures|        prediction|
+--------------------+-------------+--------------------+------------------+
|[0.0,1.0,1000.0,0...|       654000|[-0.7016970859979...| 939304.3242026168|
|[0.0,1.0,1500.0,0...|       730000|[-0.7016970859979...| 594298.9822846572|
|[0.0,1.0,5000.0,0...|       630000|[-0.7016970859979...| 587605.7703828275|
|[0.0,1.0,16000.0,...|       370000|[-0.7016970859979...| 547857.1521283562|
|[0.0,1.0,20000.0,...|       700000|[-0.7016970859979...| 752268.6303282027|
|[0.0,2.0,5000.0,0...|       675000|[-0.7016970859979...| 561892.6704852635|
|[0.0,2.0,5000.0,0...|       540000|[-0.7016970859979...| 893407.0025349311|
|[0.0,2.0,5621.0,0...|       650000|[-0.7016970859979...|1106751.7669317108|
|[0.0,2.0,5621.0,0...|       650000|[-0.7016970859979...|1106751.7669317108|
|[0.0,2.0,5621.0,0...|       650000|[-0.7016970859979...|1106751.7669317108|

In [None]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)


The coefficient of the model is : DenseVector([102638.0798, -116301.792, -43133.2881, 51443.1467, 44964.1361, 118948.8134, 15208.6954, 63929.3204, -211494.5517, 638040.4351, 32997.4028, 0.0])
The Intercept of the model is : 606634.388699


In [None]:
eval = RegressionEvaluator(labelCol="selling_price", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print(f'RMSE : {rmse:.3f}')

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print(f'MSE : {mse:.3f}')

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print(f'MAE : {mae:.3f}')

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print(f'r2 : {r2:.3f}')

RMSE : 453444.148
MSE : 205611594914.906
MAE : 263405.342
r2 : 0.697


#### Without Standardization of the features column

In [None]:
# features dataframe without standardization
feature_dataframe.show(10)

+--------------------+-------------+
|            features|selling_price|
+--------------------+-------------+
|[0.0,7.0,145500.0...|       450000|
|[10.0,7.0,120000....|       370000|
|[3.0,15.0,140000....|       158000|
|[1.0,11.0,127000....|       225000|
|[0.0,14.0,120000....|       130000|
|[1.0,4.0,45000.0,...|       440000|
|[0.0,14.0,175000....|        96000|
|[8.0,10.0,90000.0...|       350000|
|[4.0,8.0,169000.0...|       200000|
|[5.0,7.0,68000.0,...|       500000|
+--------------------+-------------+
only showing top 10 rows



In [None]:
#splitting the non-standardized data into 80:20 ratio for training and testing
train_data,test_data = feature_dataframe.randomSplit([0.8,0.2])

# linear regression model
regressor = LinearRegression(featuresCol = 'features', labelCol = 'selling_price')

# learn model 
regressor = regressor.fit(train_data)

#predict and evaluate on testdata
pred = regressor.evaluate(test_data)

pred.predictions.show(10)

+--------------------+-------------+------------------+
|            features|selling_price|        prediction|
+--------------------+-------------+------------------+
|[0.0,1.0,1000.0,0...|       445000| 591420.1809926161|
|[0.0,1.0,1000.0,0...|       445000| 591420.1809926161|
|[0.0,1.0,5000.0,2...|       399000| 392616.9053684158|
|[0.0,1.0,13000.0,...|       475000|420264.58008255885|
|[0.0,1.0,15000.0,...|       370000|160828.29293910175|
|[0.0,1.0,20000.0,...|       700000| 736262.6075484161|
|[0.0,1.0,70000.0,...|       580000| 694295.5069488916|
|[0.0,2.0,3000.0,0...|       760000| 557214.3272356153|
|[0.0,2.0,5000.0,0...|       675000| 558094.1343081724|
|[0.0,2.0,5000.0,0...|       711000| 587816.0485835006|
+--------------------+-------------+------------------+
only showing top 10 rows



In [None]:
#coefficient of the regression model
coeff = regressor.coefficients

#X and Y intercept
intr = regressor.intercept

print ("The coefficient of the model is : %a" %coeff)
print ("The Intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([22570.5707, -29968.6786, -0.8393, 107366.6524, 102030.2805, 350433.9491, 14069.6771, 12792.4555, -721.4982, 20084.3867, 590.8225, 0.0])
The Intercept of the model is : -495132.236358


In [None]:
eval = RegressionEvaluator(labelCol="selling_price", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print(f'RMSE : {rmse:.3f}')

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print(f'MSE : {mse:.3f}')

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print(f'MAE : {mae:.3f}')

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print(f'r2 : {r2:.3f}')

RMSE : 427859.435
MSE : 183063696189.960
MAE : 261860.673
r2 : 0.721


From the above results it seems the results for non-standardized data are better than the standardized data.
But in general cases standardized data have better results as the spectrum of the data lies between -1 and 1 which will then help to keep the coefficients of the feature variables to remain unbiased while fitting the model.