In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import isnan ,when,count,col,lit
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("car").getOrCreate()

In [4]:
schema = StructType([StructField("make",StringType(),True),StructField("model",StringType(),True),StructField("year",IntegerType(),True),StructField("EngineFuelType",StringType(),True),StructField("EngineHp",IntegerType(),True),StructField("EngineCylinders",IntegerType(),True),StructField("TransmissionType",StringType(),True),StructField("Driveen_wheels",StringType(),True),StructField("NOofdoors",IntegerType(),True),StructField("MarketCategory",StringType(),True),StructField("VehicleSize",StringType(),True),StructField("Vehicletype",StringType(),True),StructField("HighwayMpg",IntegerType(),True),StructField("city_mpg",IntegerType(),True),StructField("Popularity",IntegerType(),True),StructField("msrp",IntegerType(),True)])

In [5]:
df = spark.read.csv("newd/cardata.csv",inferSchema=True,header=True,schema=schema)

In [6]:
df.head(1)

[Row(make='FIAT', model='124 Spider', year=2017, EngineFuelType='premium unleaded (recommended)', EngineHp=160, EngineCylinders=4, TransmissionType='MANUAL', Driveen_wheels='rear wheel drive', NOofdoors=2, MarketCategory='Performance', VehicleSize='Compact', Vehicletype='Convertible', HighwayMpg=35, city_mpg=26, Popularity=819, msrp=28195)]

In [7]:
df.columns

['make',
 'model',
 'year',
 'EngineFuelType',
 'EngineHp',
 'EngineCylinders',
 'TransmissionType',
 'Driveen_wheels',
 'NOofdoors',
 'MarketCategory',
 'VehicleSize',
 'Vehicletype',
 'HighwayMpg',
 'city_mpg',
 'Popularity',
 'msrp']

In [8]:
df.printSchema()
df.describe().toPandas().transpose()

root
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- EngineFuelType: string (nullable = true)
 |-- EngineHp: integer (nullable = true)
 |-- EngineCylinders: integer (nullable = true)
 |-- TransmissionType: string (nullable = true)
 |-- Driveen_wheels: string (nullable = true)
 |-- NOofdoors: integer (nullable = true)
 |-- MarketCategory: string (nullable = true)
 |-- VehicleSize: string (nullable = true)
 |-- Vehicletype: string (nullable = true)
 |-- HighwayMpg: integer (nullable = true)
 |-- city_mpg: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- msrp: integer (nullable = true)



Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
make,11880,,,Acura,Volvo
model,11880,767.8436781609196,1511.4508757153594,124 Spider,xD
year,11880,2010.4026094276094,7.56446225730375,1990,2017
EngineFuelType,11877,,,diesel,regular unleaded
EngineHp,11811,249.46769960206586,109.29410966315454,55,1001
EngineCylinders,11850,5.628101265822785,1.782882058448863,0,16
TransmissionType,11880,,,AUTOMATED_MANUAL,UNKNOWN
Driveen_wheels,11880,,,all wheel drive,rear wheel drive
NOofdoors,11874,3.4376789624389423,0.8804751303515581,2,4


In [9]:
def replace(column,val):
    return when(column != val,column).otherwise(lit(None))
df =df.withColumn("MarketCategory",replace(col("MarketCategory"),"N/A"))

In [10]:
df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+-----+----+--------------+--------+---------------+----------------+--------------+---------+--------------+-----------+-----------+----------+--------+----------+----+
|make|model|year|EngineFuelType|EngineHp|EngineCylinders|TransmissionType|Driveen_wheels|NOofdoors|MarketCategory|VehicleSize|Vehicletype|HighwayMpg|city_mpg|Popularity|msrp|
+----+-----+----+--------------+--------+---------------+----------------+--------------+---------+--------------+-----------+-----------+----------+--------+----------+----+
|   0|    0|   0|             3|      69|             30|               0|             0|        6|          3742|          0|          0|         0|       0|         0|   0|
+----+-----+----+--------------+--------+---------------+----------------+--------------+---------+--------------+-----------+-----------+----------+--------+----------+----+



In [11]:
df=df.drop("MarketCategory")
df=df.na.drop()

In [12]:

print((df.count(),len(df.columns)))

(11778, 15)


In [13]:
df.columns

['make',
 'model',
 'year',
 'EngineFuelType',
 'EngineHp',
 'EngineCylinders',
 'TransmissionType',
 'Driveen_wheels',
 'NOofdoors',
 'VehicleSize',
 'Vehicletype',
 'HighwayMpg',
 'city_mpg',
 'Popularity',
 'msrp']

In [14]:
assembler = VectorAssembler(inputCols=['year','EngineHp','EngineCylinders','NOofdoors','HighwayMpg','city_mpg' ,'Popularity'],outputCol="Attributes")

In [15]:
regressor=RandomForestRegressor(featuresCol="Attributes",labelCol="msrp")

In [16]:
pipeline=Pipeline(stages=[assembler,regressor])
pipeline.write().overwrite().save("pipeline")

In [17]:
pipelinemodel=Pipeline.load("pipeline")
paramGrid=ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()
crossval = CrossValidator(estimator=pipelinemodel,estimatorParamMaps = paramGrid,evaluator=RegressionEvaluator(labelCol="msrp"),numFolds=3)

In [18]:
train_data,test_data = df.randomSplit([0.8,0.2],seed=123)
cvModel=crossval.fit(train_data)

In [20]:
bestmodel=cvModel.bestModel
for x in range(len(bestmodel.stages)):
    print(bestmodel.stages[x])

VectorAssembler_843c7c29e9b3
RandomForestRegressionModel: uid=RandomForestRegressor_b393616dd75e, numTrees=500, numFeatures=7


In [21]:
pred = cvModel.transform(test_data)
pred.select("msrp","prediction").show()

+-----+------------------+
| msrp|        prediction|
+-----+------------------+
|28030|33214.206219739295|
|30550|    36891.75057271|
|29350|28098.204956215657|
|27900|27267.860104435338|
|34890|27267.860104435338|
|32990|27267.860104435338|
| 2827|  4668.51519318538|
| 3000|  4668.51519318538|
| 3086|4655.9728155731955|
| 3130|4655.9728155731955|
| 3012| 4681.514079213043|
| 3622| 5844.247925996545|
|22300| 24967.96425218967|
|19400|23006.943039377857|
| 2042|  6179.37802980033|
| 2144| 4758.015797692419|
|49440| 38983.24376457492|
|52640| 38983.24376457492|
|47440| 39196.52461570135|
|58400|39135.002878013074|
+-----+------------------+
only showing top 20 rows



In [23]:
eval = RegressionEvaluator(labelCol="msrp")
rmse = eval.evaluate(pred)
mse = eval.evaluate(pred,{eval.metricName:"mse"})
mae = eval.evaluate(pred,{eval.metricName:"mae"})
r2 = eval.evaluate(pred,{eval.metricName:"r2"})

In [26]:
print(rmse)
print("RMSE : %.3f" %rmse)
print("MSE : %.3f" %mse)
print("MAE : %.3f" %mae)
print("R2 : %.3f" %r2)

17033.043073253917
RMSE : 17033.043
MSE : 290124556.335
MAE : 8661.564
R2 : 0.885
