In [174]:
# libraries
import warnings
import pandas as pd
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.sql import SparkSession

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.float_format', lambda x: '%.2f' % x)

In [175]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark_df = spark.read.csv('us_dealers_car.csv', inferSchema=True, header=True)
spark_df.show(5)

+-------------+-----------------+-------+--------+----------+------+----------+-------------+--------+---------+------------+----------+------------+--------------------+-----------+------------+--------------------+--------------------+-------------+-----+-----+
|           id|              vin|  price|   miles|  stock_no|  year|      make|        model|    trim|body_type|vehicle_type|drivetrain|transmission|           fuel_type|engine_size|engine_block|         seller_name|              street|         city|state|  zip|
+-------------+-----------------+-------+--------+----------+------+----------+-------------+--------+---------+------------+----------+------------+--------------------+-----------+------------+--------------------+--------------------+-------------+-----+-----+
|38b2f52e-8f5d|1GCWGFCF3F1284719|20998.0|115879.0|W1T503168C|2015.0| Chevrolet|Express Cargo|Work Van|Cargo Van|       Truck|       RWD|   Automatic|      E85 / Unleaded|        4.8|           V|nissan ellico

In [176]:
print("Shape: ", (spark_df.count(), len(spark_df.columns)))

Shape:  (2000000, 21)


In [177]:
spark_df.printSchema() #types of Variables

root
 |-- id: string (nullable = true)
 |-- vin: string (nullable = true)
 |-- price: double (nullable = true)
 |-- miles: double (nullable = true)
 |-- stock_no: string (nullable = true)
 |-- year: double (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- trim: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- drivetrain: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- engine_size: double (nullable = true)
 |-- engine_block: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)



In [178]:
spark_df.describe(["price"]).show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|           1812275|
|   mean| 28092.85766564125|
| stddev|19380.882628866795|
|    min|               0.0|
|    max|         1495000.0|
+-------+------------------+



In [179]:
spark_df.groupby("model").count().show()

+--------------------+-----+
|               model|count|
+--------------------+-----+
|          458 Spider|  105|
|       G Convertible|   64|
|                 MDX| 8105|
|              Cirrus|    8|
|           GLE-Class| 4480|
|        California T|  119|
|               Astra|   27|
|             Contour|   16|
|             Outlook|  122|
|            Villager|   22|
|        458 Speciale|   17|
|           Econoline|   13|
|                 S60| 3361|
|          370Z Coupe|  356|
|          Spider 124|    1|
|                  SC|  405|
|                 718|  708|
|ProMaster Window Van|   11|
|                  Q7| 4818|
|                 DBS|   12|
+--------------------+-----+
only showing top 20 rows



In [180]:
columns_to_drop = ['zip', "state"]
spark_df = spark_df.drop(*columns_to_drop)

In [181]:
spark_df.show(5)

+-------------+-----------------+-------+--------+----------+------+----------+-------------+--------+---------+------------+----------+------------+--------------------+-----------+------------+--------------------+--------------------+-------------+
|           id|              vin|  price|   miles|  stock_no|  year|      make|        model|    trim|body_type|vehicle_type|drivetrain|transmission|           fuel_type|engine_size|engine_block|         seller_name|              street|         city|
+-------------+-----------------+-------+--------+----------+------+----------+-------------+--------+---------+------------+----------+------------+--------------------+-----------+------------+--------------------+--------------------+-------------+
|38b2f52e-8f5d|1GCWGFCF3F1284719|20998.0|115879.0|W1T503168C|2015.0| Chevrolet|Express Cargo|Work Van|Cargo Van|       Truck|       RWD|   Automatic|      E85 / Unleaded|        4.8|           V|nissan ellicott city|8569 Baltimore Na...|Ellicot

In [182]:
num_cols = [col[0] for col in spark_df.dtypes if col[1] != 'string']
spark_df.select(num_cols).describe().show() 

+-------+------------------+------------------+------------------+------------------+
|summary|             price|             miles|              year|       engine_size|
+-------+------------------+------------------+------------------+------------------+
|  count|           1812275|           1979367|           1999928|           1948312|
|   mean| 28092.85766564125| 54558.54030909882|2016.2874383477806|3.0564861788005566|
| stddev|19380.882628866795|46982.231870885385| 4.011167839438481| 1.349458236193614|
|    min|               0.0|               0.0|            1980.0|               0.6|
|    max|         1495000.0|         2975291.0|            2022.0|              30.0|
+-------+------------------+------------------+------------------+------------------+



In [183]:
cat_cols = [col[0] for col in spark_df.dtypes if col[1] == 'string']
spark_df.select(cat_cols).describe().show() 

+-------+-------------------+--------------------+-----------+-------+-----------------+------------------+---------+------------+----------+------------+--------------------+------------+-----------------+-----------------+---------+
|summary|                 id|                 vin|   stock_no|   make|            model|              trim|body_type|vehicle_type|drivetrain|transmission|           fuel_type|engine_block|      seller_name|           street|     city|
+-------+-------------------+--------------------+-----------+-------+-----------------+------------------+---------+------------+----------+------------+--------------------+------------+-----------------+-----------------+---------+
|  count|            2000000|             2000000|    1949729|2000000|          1995985|           1992372|  1986086|     1980801|   1992172|     1993295|             1977164|     1947295|          1996814|          1991191|  1996686|
|   mean|0.06842204211774171|4.892689445029021...|   Infinit

In [184]:
from pyspark.sql.functions import when, count, col
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
id,0
vin,0
price,187725
miles,20633
stock_no,50271
year,72
make,0
model,4015
trim,7628
body_type,13914


In [185]:
spark_df = spark_df.drop("stock_no", "engine_size", "engine_block","street", "city")

In [186]:
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T

Unnamed: 0,0
id,0
vin,0
price,187725
miles,20633
year,72
make,0
model,4015
trim,7628
body_type,13914
vehicle_type,19199


In [187]:
spark_df = spark_df.na.drop()

In [188]:
spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns]).toPandas().T  
#our dataframe is ready.

Unnamed: 0,0
id,0
vin,0
price,0
miles,0
year,0
make,0
model,0
trim,0
body_type,0
vehicle_type,0


In [201]:
#RENAMING COLUMNS
spark_df = spark_df.withColumnRenamed("price", "target")

In [202]:
#Filling na's with their mean.
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols = ["target","miles", "year"],
    outputCols = ["target", "miles_imputed", "year_imputed"]
).setStrategy("mean")

In [203]:
spark_df =imputer.fit(spark_df).transform(spark_df)

In [204]:
#Preprocessing
spark_df = spark_df.drop("id", "vin")

In [205]:
spark_df.show()

+--------+--------+------+-------------+---------------+--------+---------+------------+----------+------------+--------------------+--------------------+-------------+------------+
|  target|   miles|  year|         make|          model|    trim|body_type|vehicle_type|drivetrain|transmission|           fuel_type|         seller_name|miles_imputed|year_imputed|
+--------+--------+------+-------------+---------------+--------+---------+------------+----------+------------+--------------------+--------------------+-------------+------------+
| 20998.0|115879.0|2015.0|    Chevrolet|  Express Cargo|Work Van|Cargo Van|       Truck|       RWD|   Automatic|      E85 / Unleaded|nissan ellicott city|     115879.0|      2015.0|
| 27921.0|  7339.0|2018.0|          BMW|             i3|       s|Hatchback|         Car|       RWD|   Automatic|Electric / Premiu...|hendrick honda po...|       7339.0|      2018.0|
| 11055.0| 39798.0|2018.0|   Mitsubishi|      Mirage G4|      SE|    Sedan|         Car|  

In [230]:
train_string_columns = []

for col, dtype in spark_df.dtypes:
    if dtype == 'string':
        train_string_columns.append(col)

In [231]:
train_df = spark_df

In [232]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+'_index', handleInvalid='keep').fit(train_df) for column in train_string_columns]


pipeline = Pipeline(stages=indexers)
train_indexed = pipeline.fit(train_df).transform(train_df)

In [234]:
def get_dtype(df,colname):
    return [dtype for name, dtype in df.dtypes if name == colname][0]

num_cols_train = []
for col in train_indexed.columns:
    if get_dtype(train_indexed,col) != 'string':
        num_cols_train.append(str(col))
        
train_indexed = train_indexed.select(num_cols_train)

In [235]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = train_indexed.drop("target").columns, outputCol = 'features').setHandleInvalid("keep")

train_vector = vectorAssembler.transform(train_indexed)

In [236]:
# Train_test_split

splits = train_vector.randomSplit([0.7, 0.3])
train = splits[0]
val = splits[1]

In [249]:
#Regression Model

from pyspark.ml.regression import LinearRegression
from  pyspark.sql.functions import abs
lr = LinearRegression(featuresCol = 'features', labelCol='target', maxIter=10, 
                      regParam=0.8, elasticNetParam=0.1) # It is always a good idea to play with hyperparameters.
lr_model = lr.fit(train)

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

lr_predictions = lr_model.transform(val)
lr_predictions = lr_predictions.withColumn("prediction",abs(lr_predictions["prediction"]))
lr_predictions.select("prediction","target","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="target",metricName="r2")
print("R Squared (R2) on val data = %g" % lr_evaluator.evaluate(lr_predictions))

RMSE: 15301.242008
r2: 0.317105
+------------------+------+--------------------+
|        prediction|target|            features|
+------------------+------+--------------------+
|11686.686157552525|   998|[179900.0,2003.0,...|
|11684.864682183601|   998|[179900.0,2003.0,...|
|11681.555241020862|   998|[179900.0,2003.0,...|
| 4456.187117821071|  1195|[241929.0,2004.0,...|
| 32658.36297209887|  1320|[276648.0,1998.0,...|
+------------------+------+--------------------+
only showing top 5 rows

R Squared (R2) on val data = 0.324406


In [None]:
# >>>>>> R Squared (R2) on val data = 0.324406