In [1]:
import os
os.environ["SPARK_HOME"]='/usr/local/spark'
import findspark
findspark.init()
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = (SparkSession
        .builder
        .appName("Car")
         .enableHiveSupport()
        .getOrCreate())

In [3]:
carDF = spark.read.option("samplingRatio", 0.001).option("header", "true").csv("Car_sales_details_p3.csv")

In [4]:
salesDF = spark.read.option("samplingRatio", 0.001).option("header", "true").csv("sales_Status_p3.csv")

In [5]:
stateDF = spark.read.option("samplingRatio", 0.001).option("header", "true").csv("State_region_mapping_p3.csv")

In [6]:
carDF.printSchema()

root
 |-- Sales_ID: string (nullable = true)
 |-- name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- selling_price: string (nullable = true)
 |-- km_driven: string (nullable = true)
 |-- State or Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- seats: string (nullable = true)



In [7]:
salesDF.printSchema()

root
 |-- Sales_ID: string (nullable = true)
 |-- sold: string (nullable = true)



In [8]:
stateDF.printSchema()

root
 |-- Region: string (nullable = true)
 |-- State or Province: string (nullable = true)



In [9]:
salesDF = salesDF.withColumnRenamed("Sales_ID", "Sale_ID")

In [10]:
inner_join = carDF.join(salesDF, carDF.Sales_ID == salesDF.Sale_ID)
inner_join.show()

+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+-------+----+
|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|Sale_ID|sold|
+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+-------+----+
|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|      1|   Y|
|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5|      2|   Y|
|       3|Honda City 2017-2...|2006|       158000|

In [11]:
inner_join=inner_join.drop('Sale_ID')

In [12]:
inner_join.show(2)

+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+
|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|sold|
+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+
|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|   Y|
|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5|   Y|
+--------+--------------------+----+-------------+---------+--------------------+---------

In [13]:
inner_join = inner_join.withColumnRenamed("State or Province", "State_or_province")

In [14]:
FinalDF = inner_join.join(stateDF,inner_join.State_or_province == stateDF['State or Province'])
FinalDF.show()

+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------------------+
|Sales_ID|                name|year|selling_price|km_driven|   State_or_province|         City|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|sold| Region|   State or Province|
+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------------------+
|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|   Y|   East|District of Columbia|
|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 

In [15]:
FinalDF=FinalDF.drop('State or Province')

In [16]:
FinalDF.printSchema()

root
 |-- Sales_ID: string (nullable = true)
 |-- name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- selling_price: string (nullable = true)
 |-- km_driven: string (nullable = true)
 |-- State_or_province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- seller_type: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- owner: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- sold: string (nullable = true)
 |-- Region: string (nullable = true)



In [17]:
from pyspark.sql.functions import isnan, when, count, col
FinalDF.select([count(when(isnan(c), c)).alias(c) for c in FinalDF.columns]).show()

+--------+----+----+-------------+---------+-----------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+
|Sales_ID|name|year|selling_price|km_driven|State_or_province|City|fuel|seller_type|transmission|owner|mileage|engine|max_power|seats|sold|Region|
+--------+----+----+-------------+---------+-----------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+
|       0|   0|   0|            0|        0|                0|   0|   0|          0|           0|    0|      0|     0|        0|    0|   0|     0|
+--------+----+----+-------------+---------+-----------------+----+----+-----------+------------+-----+-------+------+---------+-----+----+------+



As there are no missing values it is displayed as 0

Preprocessing

In [18]:
carData= FinalDF.withColumn('engine', regexp_replace('engine',' CC',''))
carData= carData.withColumn('mileage', regexp_replace('mileage',' kmpl',''))
carData = carData.withColumn('mileage', regexp_replace('mileage',' km/kg',''))
carData = carData.withColumn('max_power', regexp_replace('max_power',' bhp',''))

In [19]:
carData.show(5)

+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+
|Sales_ID|                name|year|selling_price|km_driven|   State_or_province|         City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| Region|
+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+
|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|    5|   Y|   East|
|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|  21.14|  1498|   103.52|    5|   Y|   East|
|       3|Honda City 2017-2...|2006|       158000|   140000|            Ill

In [20]:
from pyspark.ml.feature import StringIndexer
strs=["name","State_or_province","City","fuel","seller_type","transmission","owner","seats","sold","Region"]


In [21]:
for i in strs:
        indexer = StringIndexer(inputCol=i, outputCol=i+"_index")
        carData = indexer.fit(carData).transform(carData)

In [22]:
carData.show()

+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+----------+-----------------------+----------+----------+-----------------+------------------+-----------+-----------+----------+------------+
|Sales_ID|                name|year|selling_price|km_driven|   State_or_province|         City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| Region|name_index|State_or_province_index|City_index|fuel_index|seller_type_index|transmission_index|owner_index|seats_index|sold_index|Region_index|
+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+----------+-----------------------+----------+----------+-----------------+------------------+-----------+-----------+----------+------------+
|       1|Maruti Sw

In [23]:
carData=carData.drop('Sales_ID','name','State_or_province','City','fuel','seller_type','transmission','owner','seats','sold','Region')
carData.printSchema()

root
 |-- year: string (nullable = true)
 |-- selling_price: string (nullable = true)
 |-- km_driven: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- max_power: string (nullable = true)
 |-- name_index: double (nullable = true)
 |-- State_or_province_index: double (nullable = true)
 |-- City_index: double (nullable = true)
 |-- fuel_index: double (nullable = true)
 |-- seller_type_index: double (nullable = true)
 |-- transmission_index: double (nullable = true)
 |-- owner_index: double (nullable = true)
 |-- seats_index: double (nullable = true)
 |-- sold_index: double (nullable = true)
 |-- Region_index: double (nullable = true)



In [24]:
from pyspark.sql.functions import col

In [25]:
carData = (carData
  .withColumn("year",col("year").cast(IntegerType()))
  .withColumn("mileage", col("mileage").cast(FloatType()))
  .withColumn("max_power", col("max_power").cast(FloatType()))
  .withColumn("engine", col("engine").cast(IntegerType())))

In [26]:
carData = (carData
  .withColumn("km_driven", col("km_driven").cast(IntegerType()))
  .withColumn("selling_price", col("selling_price").cast(IntegerType())))

In [27]:
carData.printSchema()

root
 |-- year: integer (nullable = true)
 |-- selling_price: integer (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- engine: integer (nullable = true)
 |-- max_power: float (nullable = true)
 |-- name_index: double (nullable = true)
 |-- State_or_province_index: double (nullable = true)
 |-- City_index: double (nullable = true)
 |-- fuel_index: double (nullable = true)
 |-- seller_type_index: double (nullable = true)
 |-- transmission_index: double (nullable = true)
 |-- owner_index: double (nullable = true)
 |-- seats_index: double (nullable = true)
 |-- sold_index: double (nullable = true)
 |-- Region_index: double (nullable = true)



In [28]:
carData.show(5)

+----+-------------+---------+-------+------+---------+----------+-----------------------+----------+----------+-----------------+------------------+-----------+-----------+----------+------------+
|year|selling_price|km_driven|mileage|engine|max_power|name_index|State_or_province_index|City_index|fuel_index|seller_type_index|transmission_index|owner_index|seats_index|sold_index|Region_index|
+----+-------------+---------+-------+------+---------+----------+-----------------------+----------+----------+-----------------+------------------+-----------+-----------+----------+------------+
|2014|       450000|   145500|   23.4|  1248|     74.0|       0.0|                   33.0|       5.0|       0.0|              0.0|               0.0|        0.0|        0.0|       1.0|         2.0|
|2014|       370000|   120000|  21.14|  1498|   103.52|     770.0|                    2.0|       0.0|       0.0|              0.0|               0.0|        1.0|        0.0|       1.0|         2.0|
|2006|    

In [29]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
carData.columns

['year',
 'selling_price',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'name_index',
 'State_or_province_index',
 'City_index',
 'fuel_index',
 'seller_type_index',
 'transmission_index',
 'owner_index',
 'seats_index',
 'sold_index',
 'Region_index']

In [30]:
assembler = VectorAssembler(
    inputCols=['year',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'name_index',
 'State_or_province_index',
 'City_index',
 'fuel_index',
 'seller_type_index',
 'transmission_index',
 'owner_index',
 'seats_index',
 'sold_index',
 'Region_index'],
    outputCol="features")

In [31]:
output = assembler.transform(carData)

In [32]:
out=output.select("features","selling_price")
out.show()

+--------------------+-------------+
|            features|selling_price|
+--------------------+-------------+
|[2014.0,145500.0,...|       450000|
|[2014.0,120000.0,...|       370000|
|[2006.0,140000.0,...|       158000|
|[2010.0,127000.0,...|       225000|
|[2007.0,120000.0,...|       130000|
|[2017.0,45000.0,2...|       440000|
|[2007.0,175000.0,...|        96000|
|[2001.0,5000.0,16...|        45000|
|[2011.0,90000.0,2...|       350000|
|[2013.0,169000.0,...|       200000|
|[2014.0,68000.0,1...|       500000|
|[2005.0,100000.0,...|        92000|
|[2009.0,140000.0,...|       280000|
|[2009.0,90000.0,1...|       180000|
|[2016.0,40000.0,1...|       400000|
|[2016.0,70000.0,2...|       778000|
|[2012.0,53000.0,2...|       500000|
|[2002.0,80000.0,1...|       150000|
|[2016.0,100000.0,...|       680000|
|[2011.0,100000.0,...|       174000|
+--------------------+-------------+
only showing top 20 rows



In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressionModel, RandomForestRegressor

In [34]:
scaler = StandardScaler(inputCol="features", outputCol="scaledfeatures")

In [35]:
scale = scaler.fit(out).transform(out)
scaled_data=scale.select("scaledfeatures","selling_price")

In [36]:
scaled_data.show(5)

+--------------------+-------------+
|      scaledfeatures|selling_price|
+--------------------+-------------+
|[521.262631612101...|       450000|
|[521.262631612101...|       370000|
|[519.192074982062...|       158000|
|[520.227353297081...|       225000|
|[519.450894560817...|       130000|
+--------------------+-------------+
only showing top 5 rows



In [61]:
(train, test) = scaled_data.randomSplit([0.8, 0.2])

In [62]:
rf = RandomForestRegressor(numTrees=150).setLabelCol("selling_price").setFeaturesCol("scaledfeatures")

In [63]:
random_forest=rf.fit(train)

In [64]:
test_results = random_forest.transform(test)
test_results.show()

+--------------------+-------------+------------------+
|      scaledfeatures|selling_price|        prediction|
+--------------------+-------------+------------------+
|(15,[0,1,2,3,4,5,...|       355000| 336096.9543359037|
|(15,[0,1,2,3,4,5,...|       220000| 288111.8414381165|
|(15,[0,1,2,3,4,5,...|       215000| 284750.2934464828|
|(15,[0,1,2,3,4,5,...|       270000|370746.54979874415|
|(15,[0,1,2,3,4,5,...|       204999|284066.73966344824|
|(15,[0,1,2,3,4,5,...|       370000|376673.82850198325|
|(15,[0,1,2,3,4,5,...|       425000|424049.58456539834|
|(15,[0,1,2,3,4,5,...|       200000| 357862.9519864981|
|(15,[0,1,2,3,4,5,...|       295000| 423737.1572032895|
|(15,[0,1,2,3,4,5,...|       229999| 302947.0804613739|
|(15,[0,1,2,3,4,5,...|       290000| 330742.6363601765|
|(15,[0,1,2,3,4,5,...|       229999|308238.18206543056|
|(15,[0,1,2,3,4,5,...|       200000| 367453.4590237305|
|(15,[0,1,2,3,4,5,...|       615000| 443070.7224159461|
|(15,[0,1,2,3,4,5,...|       295000| 363692.6016

RMSE

In [65]:
evaluator1 = RegressionEvaluator().setLabelCol("selling_price").setPredictionCol("prediction").setMetricName("rmse")

In [66]:
rmse = evaluator1.evaluate(test_results)
rmse

181559.60487681124

R2-score

In [67]:
evaluator2 = RegressionEvaluator().setLabelCol("selling_price").setPredictionCol("prediction").setMetricName("r2")

In [68]:
r2 = evaluator2.evaluate(test_results)
r2

0.9469430070765242

In [69]:
evaluator3 = RegressionEvaluator().setLabelCol("selling_price").setPredictionCol("prediction").setMetricName("mse")

In [70]:
mse = evaluator3.evaluate(test_results)
mse

32963890123.023823