### Importing libraries

In [1]:
import pyspark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import pandas as pd

### Loading Data

In [2]:
my_spark = SparkSession \
    .builder \
    .appName("603Project") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/603Project.603Project") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/603Project.603Project") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

In [3]:
SparkSession.builder.getOrCreate()

In [4]:
data = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [5]:
data.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- daysonmarket: integer (nullable = true)
 |-- engine_displacement: integer (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: double (nullable = true)
 |-- fuel_tank_volume: double (nullable = true)
 |-- height: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- length: double (nullable = true)
 |-- listing_color: string (nullable = true)
 |-- make_name: string (nullable = true)
 |-- maximum_seating: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- seller_rating: double (nullable = true)
 |-- torque: integer (nullable = true)
 |-- transmission_display: string (nullable = true)
 |-- wheel_system_display: string (nullable = true)


In [6]:
print((data.count(), len(data.columns)))

(69736, 23)


### Data Preprocessing

In [7]:
from pyspark.ml.feature import StringIndexer,StandardScaler, OneHotEncoder
si1 = StringIndexer(inputCol='body_type',outputCol='body_type_si')
si2 = StringIndexer(inputCol='city',outputCol='city_si')
si3 = StringIndexer(inputCol='engine_type',outputCol='engine_type_si')
si4 = StringIndexer(inputCol='exterior_color',outputCol='exterior_color_si')
si5 = StringIndexer(inputCol='franchise_make',outputCol='franchise_make_si')
si6 = StringIndexer(inputCol='make_name',outputCol='make_name_si')
si7 = StringIndexer(inputCol='transmission_display',outputCol='transmission_display_si')
si8 = StringIndexer(inputCol='wheel_system_display',outputCol='wheel_system_display_si')
si9 = StringIndexer(inputCol='maximum_seating',outputCol='maximum_seating_si')

ohe1 = OneHotEncoder(inputCol='body_type_si',outputCol='body_type_ohe')
ohe2 = OneHotEncoder(inputCol='city_si',outputCol='city_ohe')
ohe3 = OneHotEncoder(inputCol='engine_type_si',outputCol='engine_type_ohe')
ohe4 = OneHotEncoder(inputCol='exterior_color_si',outputCol='exterior_color_ohe')
ohe5 = OneHotEncoder(inputCol='franchise_make_si',outputCol='franchise_make_ohe')
ohe6 = OneHotEncoder(inputCol='make_name_si',outputCol='make_name_ohe')
ohe7 = OneHotEncoder(inputCol='transmission_display_si',outputCol='transmission_display_ohe')
ohe8 = OneHotEncoder(inputCol='wheel_system_display_si',outputCol='wheel_system_display_ohe')
ohe9 = OneHotEncoder(inputCol='maximum_seating_si',outputCol='maximum_seating_ohe')



In [24]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=['daysonmarket','engine_displacement','front_legroom','fuel_tank_volume','horsepower','mileage',
                               'seller_rating','torque','width'], outputCol='unscaled_features')

sc = StandardScaler(inputCol="unscaled_features", outputCol="num_features")

va2 = VectorAssembler(inputCols=['body_type_ohe','city_ohe','engine_type_ohe','exterior_color_ohe','franchise_make_ohe',
                               'make_name_ohe','transmission_display_ohe','wheel_system_display_ohe','maximum_seating_ohe'], outputCol='cat_features')

va_features = VectorAssembler(inputCols=['num_features','cat_features'],
  outputCol='final_features')

### Building Pipeline

In [25]:
from pyspark.ml import Pipeline
pipe = Pipeline(stages=[si1,si2,si3,si4,si5,si6,si7,si8,si9,ohe1,ohe2,ohe3,ohe4,ohe5,ohe6,ohe7,ohe8,ohe9,va2,va,sc,va_features])
model = pipe.fit(data)
transformed_data= model.transform(data)

In [26]:
transformed_data.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- daysonmarket: integer (nullable = true)
 |-- engine_displacement: integer (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: double (nullable = true)
 |-- fuel_tank_volume: double (nullable = true)
 |-- height: double (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- length: double (nullable = true)
 |-- listing_color: string (nullable = true)
 |-- make_name: string (nullable = true)
 |-- maximum_seating: string (nullable = true)
 |-- mileage: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- seller_rating: double (nullable = true)
 |-- torque: integer (nullable = true)
 |-- transmission_display: string (nullable = true)
 |-- wheel_system_display: string (nullable = true)


In [27]:
#spliting the data for training and testing
splits = transformed_data.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

### Modelling

#### LinearRegression

In [31]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'final_features', labelCol='price', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [32]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","price").show(5)

+------------------+-----+
|        prediction|price|
+------------------+-----+
|31478.177669868768|35075|
| 44228.47360741431|53645|
|24291.437412614057|22072|
|23343.430107790347|26548|
|24613.446830614328|23414|
+------------------+-----+
only showing top 5 rows



In [33]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 3896.910280
r2: 0.857426


#### DecisionTreeRegressor

In [34]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol = 'final_features', labelCol='price')
dt_model = dt.fit(train_df)

In [35]:
dt_predictions = dt_model.transform(test_df)
dt_predictions.select("prediction","price").show(5)

+------------------+-----+
|        prediction|price|
+------------------+-----+
|35783.018101761256|35075|
| 47076.66278493558|53645|
|21990.663392857143|22072|
|27797.510836363635|26548|
| 24867.91388888889|23414|
+------------------+-----+
only showing top 5 rows



In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 3939.51


In [38]:
y_true = dt_predictions.select("price").toPandas()
y_pred = dt_predictions.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

r2_score: 0.8537751584336537


#### RandomForestRegressor

In [39]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="final_features", labelCol='price')
rf_model = rf.fit(train_df)

In [40]:
rf_predictions = rf_model.transform(test_df)
rf_predictions.select("prediction","price").show(5)

+------------------+-----+
|        prediction|price|
+------------------+-----+
| 37717.96245297829|35075|
|  46823.6371795753|53645|
| 22280.20359377212|22072|
|  27563.3884496365|26548|
|24340.237769708292|23414|
+------------------+-----+
only showing top 5 rows



In [41]:
evaluator = RegressionEvaluator(
    labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 3852.86


In [42]:
y_true = rf_predictions.select("price").toPandas()
y_pred = rf_predictions.select("prediction").toPandas()

import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred)
print('r2_score: {0}'.format(r2_score))

r2_score: 0.8601368476264593
