In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('Used Car Price Prediction') \
    .getOrCreate()

from google.colab import drive
drive.mount('/content/drive')

In [4]:
sc = spark.sparkContext

import pandas as pd

df = (spark.read
          .format("csv")
          .option('header', 'true')
          .option('encoding', 'utf-8')
          .option('sep', '\t')
          .load('/content/drive/MyDrive/Cloud Computing/UsedCars.csv'))

In [5]:
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .option('encoding', 'utf-8')
          .option('sep', '\t')
          .load('UsedCars.csv'))

In [6]:
#df = pd.read_csv('/content/drive/MyDrive/Cloud Computing/UsedCars.csv',sep='\t',encoding='utf-8')

In [7]:
df.head(10)

[Row(_c0='27', id='7316814884', region='auburn', price='33590', year='2014', manufacturer='gmc', model='sierra 1500 crew cab slt', condition='good', odometer='57923.0', size='na', type='pickup', state='al', post_year='2021', post_month='05', post_date='04'),
 Row(_c0='28', id='7316814758', region='auburn', price='22590', year='2010', manufacturer='chevrolet', model='silverado 1500', condition='good', odometer='71229.0', size='na', type='pickup', state='al', post_year='2021', post_month='05', post_date='04'),
 Row(_c0='29', id='7316814989', region='auburn', price='39590', year='2020', manufacturer='chevrolet', model='silverado 1500 crew', condition='good', odometer='19160.0', size='na', type='pickup', state='al', post_year='2021', post_month='05', post_date='04'),
 Row(_c0='30', id='7316743432', region='auburn', price='30990', year='2017', manufacturer='toyota', model='tundra double cab sr', condition='good', odometer='41124.0', size='na', type='pickup', state='al', post_year='2021', po

In [8]:
features = ['region', 'year', 'price','manufacturer', 'model', 'condition', 'odometer', 'size', 'type', 'state']

In [9]:
df = df[features]

In [10]:
df.head(10)

[Row(region='auburn', year='2014', price='33590', manufacturer='gmc', model='sierra 1500 crew cab slt', condition='good', odometer='57923.0', size='na', type='pickup', state='al'),
 Row(region='auburn', year='2010', price='22590', manufacturer='chevrolet', model='silverado 1500', condition='good', odometer='71229.0', size='na', type='pickup', state='al'),
 Row(region='auburn', year='2020', price='39590', manufacturer='chevrolet', model='silverado 1500 crew', condition='good', odometer='19160.0', size='na', type='pickup', state='al'),
 Row(region='auburn', year='2017', price='30990', manufacturer='toyota', model='tundra double cab sr', condition='good', odometer='41124.0', size='na', type='pickup', state='al'),
 Row(region='auburn', year='2013', price='15000', manufacturer='ford', model='f-150 xlt', condition='excellent', odometer='128000.0', size='full-size', type='truck', state='al'),
 Row(region='auburn', year='2012', price='27990', manufacturer='gmc', model='sierra 2500 hd extended 

In [11]:
df.dtypes

[('region', 'string'),
 ('year', 'string'),
 ('price', 'string'),
 ('manufacturer', 'string'),
 ('model', 'string'),
 ('condition', 'string'),
 ('odometer', 'string'),
 ('size', 'string'),
 ('type', 'string'),
 ('state', 'string')]

In [12]:
df = df.withColumn("year", df.year.cast('float'))
df = df.withColumn("price", df.price.cast('float'))
df = df.withColumn("odometer", df.odometer.cast('float'))

In [13]:
df.limit(5).toPandas()

Unnamed: 0,region,year,price,manufacturer,model,condition,odometer,size,type,state
0,auburn,2014.0,33590.0,gmc,sierra 1500 crew cab slt,good,57923.0,na,pickup,al
1,auburn,2010.0,22590.0,chevrolet,silverado 1500,good,71229.0,na,pickup,al
2,auburn,2020.0,39590.0,chevrolet,silverado 1500 crew,good,19160.0,na,pickup,al
3,auburn,2017.0,30990.0,toyota,tundra double cab sr,good,41124.0,na,pickup,al
4,auburn,2013.0,15000.0,ford,f-150 xlt,excellent,128000.0,full-size,truck,al


In [14]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
#from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [15]:
categorical_variables = ['region', 'manufacturer', 'model', 'condition', 'size', 'type', 'state',]

In [16]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]

In [17]:
encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

In [18]:
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)

In [19]:
pipeline = Pipeline(stages=indexers + [encoder, assembler])

In [20]:
transformed_df = pipeline.fit(df).transform(df)

In [21]:
transformed_df.printSchema()

root
 |-- region: string (nullable = true)
 |-- year: float (nullable = true)
 |-- price: float (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- odometer: float (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- state: string (nullable = true)
 |-- region-index: double (nullable = false)
 |-- manufacturer-index: double (nullable = false)
 |-- model-index: double (nullable = false)
 |-- condition-index: double (nullable = false)
 |-- size-index: double (nullable = false)
 |-- type-index: double (nullable = false)
 |-- state-index: double (nullable = false)
 |-- state-index-encoded: vector (nullable = true)
 |-- size-index-encoded: vector (nullable = true)
 |-- region-index-encoded: vector (nullable = true)
 |-- condition-index-encoded: vector (nullable = true)
 |-- model-index-encoded: vector (nullable = true)
 |-- type-index-encoded: vector (nullab

In [22]:
continuous_variables = ['year', 'odometer', 'price']

In [23]:
assembler = VectorAssembler(
    inputCols=['categorical-features', *continuous_variables],
    outputCol='features'
)

In [24]:
transformed_df = assembler.transform(transformed_df)

In [25]:
transformed_df.limit(5).toPandas()['features'][0]

SparseVector(21261, {322: 1.0, 410: 1.0, 631: 1.0, 21186: 1.0, 21191: 1.0, 21198: 1.0, 21236: 1.0, 21258: 2014.0, 21259: 57923.0, 21260: 33590.0})

In [26]:
import six
for i in transformed_df.columns:
  if not( isinstance(transformed_df.select(i).take(1)[0][0], six.string_types)):
     print( "Correlation to Price for ", i, transformed_df.stat.corr('price',i))
  if(i == 'state-index'):
    break

Correlation to Price for  year 0.22806958104753666
Correlation to Price for  price 1.0
Correlation to Price for  odometer -0.18729715868031527
Correlation to Price for  region-index 0.10811338120753715
Correlation to Price for  manufacturer-index 0.04824781890100327
Correlation to Price for  model-index -0.07348688646658944
Correlation to Price for  condition-index -0.2639807586187341
Correlation to Price for  size-index -0.3073259747801099
Correlation to Price for  type-index 0.10625641346185531
Correlation to Price for  state-index 0.004667732192301333


In [27]:
transformed_df.limit(5).toPandas()

Unnamed: 0,region,year,price,manufacturer,model,condition,odometer,size,type,state,...,state-index,state-index-encoded,size-index-encoded,region-index-encoded,condition-index-encoded,model-index-encoded,type-index-encoded,manufacturer-index-encoded,categorical-features,features
0,auburn,2014.0,33590.0,gmc,sierra 1500 crew cab slt,good,57923.0,na,pickup,al,...,28.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,auburn,2010.0,22590.0,chevrolet,silverado 1500,good,71229.0,na,pickup,al,...,28.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,auburn,2020.0,39590.0,chevrolet,silverado 1500 crew,good,19160.0,na,pickup,al,...,28.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,auburn,2017.0,30990.0,toyota,tundra double cab sr,good,41124.0,na,pickup,al,...,28.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,auburn,2013.0,15000.0,ford,f-150 xlt,excellent,128000.0,full-size,truck,al,...,28.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [28]:
finalized_data=transformed_df.select("features","price")
finalized_data.show()

+--------------------+-------+
|            features|  price|
+--------------------+-------+
|(21261,[322,410,6...|33590.0|
|(21261,[322,404,4...|22590.0|
|(21261,[322,404,4...|39590.0|
|(21261,[322,405,5...|30990.0|
|(21261,[322,403,5...|15000.0|
|(21261,[322,410,7...|27990.0|
|(21261,[322,404,5...|34590.0|
|(21261,[322,405,4...|35000.0|
|(21261,[322,404,5...|29990.0|
|(21261,[322,404,5...|38590.0|
|(21261,[322,408,5...| 4500.0|
|(21261,[322,408,5...|32990.0|
|(21261,[322,404,4...|24590.0|
|(21261,[322,404,6...|30990.0|
|(21261,[322,405,5...|27990.0|
|(21261,[322,404,4...|37990.0|
|(21261,[322,405,5...|33590.0|
|(21261,[322,403,5...|30990.0|
|(21261,[322,407,6...|27990.0|
|(21261,[322,408,6...|    0.0|
+--------------------+-------+
only showing top 20 rows



In [29]:
# split data into training data and testing data
trainTest = finalized_data.randomSplit([0.8, 0.2], seed=2000)
trainingDF = trainTest[0]
testDF = trainTest[1]

GBT Regression Model

In [30]:
#Model Creation
#from __future__ import print_function
#from pyspark.ml.regression import GBTRegressor

In [31]:
#create our Model
#dtr = GBTRegressor().setFeaturesCol("features").setLabelCol("price")


In [32]:
# Train the model using training data
#model = dtr.fit(trainingDF)

In [33]:
# Generate predictions using model for all features
#fullPredictions = model.transform(testDF).cache()

Linear Regression Model

In [34]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [35]:
lr_model = lr.fit(trainingDF)

In [36]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.0,-176.75977374271986,-18.131509413806175,-62.0810238019007,-41.751813231070905,-166.50848890239652,-106.42722240508716,122.16731769147826,-37.98974143433074,-119.48909576233632,-160.7577832056846,0.0,45.28506248803394,-8.700037015331244,133.21914045266934,164.78150768240835,7.280171062413567,-18.415794779970618,51.311406858978906,-231.28363642989515,-29.899226473654373,-56.38489641850598,-76.38976113688938,0.0,79.43249896099694,-42.15429891585186,94.631047350708,24.034235526665533,90.257962621914,-35.617313230581075,102.1459927235077,-12.029406784841818,103.75970746221716,72.50113495501928,-59.92892093690276,7.764532781711676,16.22293543587935,-132.1434812168354,16.30472532680261,0.0,42.80481050118091,-15.498172028192972,-0.0,-0.0,-39.30502326794075,216.1296253082189,-37.6040031695259,-107.66717435682376,-77.76179342575828,19.519082606231915,-77.70310700307539,-40.478613551828246,-10.701837625191438,-58.0158785361647,46.79992701384611,-102.65738398770851,103.88365710

In [37]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 288.955004
r2: 0.999444


In [38]:
lr_predictions = lr_model.transform(testDF)

In [40]:
lr_predictions.select("prediction","price","features").show(5)

+------------------+-------+--------------------+
|        prediction|  price|            features|
+------------------+-------+--------------------+
|10393.715219553676|10100.0|(21261,[7,403,445...|
|31518.123556480525|31500.0|(21261,[7,403,445...|
| 38173.66464204977|38000.0|(21261,[7,403,445...|
| 6394.638132423119| 6000.0|(21261,[7,403,456...|
|17974.100707025485|17985.0|(21261,[7,403,472...|
+------------------+-------+--------------------+
only showing top 5 rows



In [41]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="price",metricName="r2")

In [44]:
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

R Squared (R2) on test data = 0.999324


In [46]:
test_result = lr_model.evaluate(testDF)

In [47]:
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 317.655


In [48]:
print("numIterations: %d" % trainingSummary.totalIterations)

numIterations: 11


In [49]:
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

objectiveHistory: [0.49999748153967594, 0.4260917852463403, 0.1608965524401659, 0.04454354462922632, 0.03252708918881358, 0.014338742872914425, 0.007988887698257615, 0.002639503743218479, 0.0015126765784443752, 0.0008205335813152221, 0.000354314821407525]


In [50]:
trainingSummary.residuals.show()

+--------------------+
|           residuals|
+--------------------+
|  124.86432828630495|
|   62.00969871394045|
|  -42.22384079369658|
|  352.93984366556106|
|  189.21634243859444|
| -245.38488975878863|
|  -285.9966391924827|
|    437.354289073206|
|   344.2596387440426|
|  383.52839993043744|
|  276.91605843801517|
|-0.16194361132511403|
| -63.538596217360464|
| -119.88347362434433|
|   -73.4240459480643|
|  159.30304564982362|
|  2249.5687175982894|
|  -331.4510662064422|
|   847.2829736244312|
|  -336.0064895923133|
+--------------------+
only showing top 20 rows



In [51]:
predictions = lr_model.transform(testDF)

In [52]:
predictions.select("prediction","price","features").show()

+------------------+-------+--------------------+
|        prediction|  price|            features|
+------------------+-------+--------------------+
|10393.715219553676|10100.0|(21261,[7,403,445...|
|31518.123556480525|31500.0|(21261,[7,403,445...|
| 38173.66464204977|38000.0|(21261,[7,403,445...|
| 6394.638132423119| 6000.0|(21261,[7,403,456...|
|17974.100707025485|17985.0|(21261,[7,403,472...|
|  540.191920999976| 1000.0|(21261,[7,403,502...|
| 7807.218956329249| 7999.0|(21261,[7,403,506...|
|19156.909624224005|18998.0|(21261,[7,403,518...|
|27778.436034772007|28000.0|(21261,[7,403,626...|
|30425.228454275973|29999.0|(21261,[7,403,644...|
|32451.081581998165|32000.0|(21261,[7,403,644...|
|3626.1857708167227| 4000.0|(21261,[7,403,706...|
|13511.161432138237|12900.0|(21261,[7,403,749...|
|17420.134096918322|16771.0|(21261,[7,403,749...|
|22678.601419457133|22590.0|(21261,[7,403,842...|
| 9463.501212391406| 9500.0|(21261,[7,403,920...|
| 4311.554325083518| 4500.0|(21261,[7,403,105...|
