# Checkpoint 3.1 - Data Analysis using Big Data Tools

### Capstone Project Batch: B Group: 3 

* Capstone Project Domain: Automotive Industry
* Batch B
* Group 3
* Members: Tanisha Jain , Priya Bhardwaj, Aman Bhardwaj and Satyam Sharma

### Tasks:

Big Data technologies like HDFS, Hive and PySpark need to be used as the historical data increases in size. As part of this task the following activities need to be done.
1.	●       Develop a PySpark application to load data Spark DataFrames and save it into Hive tables on a Hadoop cluster in an optimized format.
2.	●       Perform profiling of the data through PySpark and ensure that it is migrated correctly whereever the source is an RDBMS
3.	●       Write PySpark routines to cleanse the data, prepare the data to handle missing values, and the data transformations identified in task 1.1 again making sure that the data is written into Hive tables in an efficient format
4.	●       If the predictive model identified in task 2.4 available in Spark MLlib then develop a PySpark application to implement and evaluate the ML model identified with appropriate metrics\

Ensure that the best practices are followed and the design & code use the features of Spark and take advantage thereof.


Note: Here the tasks have been performed in the following order:

        1. Data preprocessing (Task 3)
        2. Saving in Hive tables (Task 1)
        3. Data profiling (Task 2)
        4. Building ML Models (Task 4)

In [1]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [2]:
pyspark.__version__

'2.2.2'

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Capstone B3").enableHiveSupport().getOrCreate()


In [4]:
spark

In [6]:
carData=(spark.read
          .format('csv')
           .option('header','true')
           .load("file:///home/hduser/Downloads/sharedfolder/mergedCarSales.csv")
          )

In [7]:
type(carData)

pyspark.sql.dataframe.DataFrame

In [8]:
carData.columns

['_c0',
 'Sales_ID',
 'name',
 'year',
 'selling_price',
 'km_driven',
 'State or Province',
 'City',
 'fuel',
 'seller_type',
 'transmission',
 'owner',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'sold',
 'region',
 'brand']

In [9]:
carData.show()

+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------+
|_c0|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|sold| region|   brand|
+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------+
|  0|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|   Y|   East|  Maruti|
|  1|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5|   Y|   

In [10]:
carData.count()

7906

#### Task 3
Write PySpark routines to cleanse the data, prepare the data to handle missing values, and the data transformations identified in task 1.1 again making sure that the data is written into Hive tables in an efficient format

1. Categorical columns have been converted into numerical columns
2. Year column has been preprocessed to give the difference of years between the current year and year sold.
3. Few columns have been dropped.
4. Few numerical columns have been preprocessed.

In [11]:
carData.show()

+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------+
|_c0|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|   mileage| engine| max_power|seats|sold| region|   brand|
+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+----------+-------+----------+-----+----+-------+--------+
|  0|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner| 23.4 kmpl|1248 CC|    74 bhp|    5|   Y|   East|  Maruti|
|  1|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|21.14 kmpl|1498 CC|103.52 bhp|    5|   Y|   

In [12]:
from pyspark.sql.functions import split
carData=carData.withColumn("mileage", split(("mileage"), " ").getItem(0))


In [13]:
carData.show(4)

+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+-------+----------+-----+----+-------+-------+
|_c0|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|mileage| engine| max_power|seats|sold| region|  brand|
+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+-------+----------+-----+----+-------+-------+
|  0|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner|   23.4|1248 CC|    74 bhp|    5|   Y|   East| Maruti|
|  1|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|  21.14|1498 CC|103.52 bhp|    5|   Y|   East|  Skoda|
|  2|

In [14]:
carData=carData.withColumn("engine", split(("engine"), " ").getItem(0))
carData=carData.withColumn("max_power", split(("max_power"), " ").getItem(0))

In [15]:
carData.show()

+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+--------+
|_c0|Sales_ID|                name|year|selling_price|km_driven|   State or Province|         City|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| region|   brand|
+---+--------+--------------------+----+-------------+---------+--------------------+-------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+--------+
|  0|       1|Maruti Swift Dzir...|2014|       450000|   145500|District of Columbia|   Washington|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|    5|   Y|   East|  Maruti|
|  1|       2|Skoda Rapid 1.5 T...|2014|       370000|   120000|            New York|New York City|Diesel| Individual|      Manual|Second Owner|  21.14|  1498|   103.52|    5|   Y|   East|   Skoda|
|  2|     

In [16]:
carData.dtypes

[('_c0', 'string'),
 ('Sales_ID', 'string'),
 ('name', 'string'),
 ('year', 'string'),
 ('selling_price', 'string'),
 ('km_driven', 'string'),
 ('State or Province', 'string'),
 ('City', 'string'),
 ('fuel', 'string'),
 ('seller_type', 'string'),
 ('transmission', 'string'),
 ('owner', 'string'),
 ('mileage', 'string'),
 ('engine', 'string'),
 ('max_power', 'string'),
 ('seats', 'string'),
 ('sold', 'string'),
 ('region', 'string'),
 ('brand', 'string')]

In [17]:
cleansedData=carData

In [18]:
cleansedData=cleansedData.drop('_c0')
cleansedData=cleansedData.drop('name')
cleansedData=cleansedData.drop('Sales_ID')
cleansedData=cleansedData.drop('City')

In [19]:
cleansedData.show(4)

+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+
|year|selling_price|km_driven|   State or Province|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| region|  brand|
+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+
|2014|       450000|   145500|District of Columbia|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|    5|   Y|   East| Maruti|
|2014|       370000|   120000|            New York|Diesel| Individual|      Manual|Second Owner|  21.14|  1498|   103.52|    5|   Y|   East|  Skoda|
|2006|       158000|   140000|            Illinois|Petrol| Individual|      Manual| Third Owner|   17.7|  1497|       78|    5|   Y|Central|  Honda|
|2010|       225000|   127000|            Illinois|Diesel| Individual|      Manual| First Owner|   23.0|  

In [20]:
cleansedData.count()

7906

In [21]:
cleansedData=cleansedData[cleansedData['mileage']!=0]

In [22]:
cleansedData.count()

7889

In [23]:
cleansedData.columns

['year',
 'selling_price',
 'km_driven',
 'State or Province',
 'fuel',
 'seller_type',
 'transmission',
 'owner',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'sold',
 'region',
 'brand']

In [24]:
from pyspark.ml.feature import StringIndexer

In [25]:
cleansedData=StringIndexer(
inputCol='owner',
outputCol='ownerLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [26]:
cleansedData.show(4)

+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+----------+
|year|selling_price|km_driven|   State or Province|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| region|  brand|ownerLabel|
+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+----------+
|2014|       450000|   145500|District of Columbia|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|    5|   Y|   East| Maruti|       0.0|
|2014|       370000|   120000|            New York|Diesel| Individual|      Manual|Second Owner|  21.14|  1498|   103.52|    5|   Y|   East|  Skoda|       1.0|
|2006|       158000|   140000|            Illinois|Petrol| Individual|      Manual| Third Owner|   17.7|  1497|       78|    5|   Y|Central|  Honda|       2.0|
|2010|       225000|   127000|          

In [27]:
cleansedData=StringIndexer(
inputCol='region',
outputCol='regionLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [28]:
cleansedData=StringIndexer(
inputCol='brand',
outputCol='brandLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [29]:
cleansedData=StringIndexer(
inputCol='fuel',
outputCol='fuelLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [30]:
cleansedData=StringIndexer(
inputCol='sold',
outputCol='soldLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [31]:
cleansedData=StringIndexer(
inputCol='transmission',
outputCol='transmissionLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [32]:
cleansedData=StringIndexer(
inputCol='seller_type',
outputCol='sellerLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [33]:
cleansedData=StringIndexer(
inputCol='State or Province',
outputCol='stateLabel',
handleInvalid='keep').fit(cleansedData).transform(cleansedData)

In [34]:
cleansedData.show(4)

+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+----------+-----------+----------+---------+---------+-----------------+-----------+----------+
|year|selling_price|km_driven|   State or Province|  fuel|seller_type|transmission|       owner|mileage|engine|max_power|seats|sold| region|  brand|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|
+----+-------------+---------+--------------------+------+-----------+------------+------------+-------+------+---------+-----+----+-------+-------+----------+-----------+----------+---------+---------+-----------------+-----------+----------+
|2014|       450000|   145500|District of Columbia|Diesel| Individual|      Manual| First Owner|   23.4|  1248|       74|    5|   Y|   East| Maruti|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|
|2014|       370000|   1

In [35]:

cleansedData=cleansedData.drop('region')
cleansedData=cleansedData.drop('sold')
cleansedData=cleansedData.drop('brand')
cleansedData=cleansedData.drop('fuel')
cleansedData=cleansedData.drop('owner')
cleansedData=cleansedData.drop('seller_type')
cleansedData=cleansedData.drop('transmission')
cleansedData=cleansedData.drop('State or Province')

cleansedData.show(4)

+----+-------------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+
|year|selling_price|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|
+----+-------------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+
|2014|       450000|   145500|   23.4|  1248|       74|    5|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|
|2014|       370000|   120000|  21.14|  1498|   103.52|    5|       1.0|        2.0|      11.0|      0.0|      1.0|              0.0|        0.0|       2.0|
|2006|       158000|   140000|   17.7|  1497|       78|    5|       2.0|        0.0|       4.0|      1.0|      1.0|              0.0|        0.0|       3.0|
|2010|       225000|   127000|   23.0|  1396|       90|   

In [36]:
cleansedData.dtypes

[('year', 'string'),
 ('selling_price', 'string'),
 ('km_driven', 'string'),
 ('mileage', 'string'),
 ('engine', 'string'),
 ('max_power', 'string'),
 ('seats', 'string'),
 ('ownerLabel', 'double'),
 ('regionLabel', 'double'),
 ('brandLabel', 'double'),
 ('fuelLabel', 'double'),
 ('soldLabel', 'double'),
 ('transmissionLabel', 'double'),
 ('sellerLabel', 'double'),
 ('stateLabel', 'double')]

In [37]:
from pyspark.sql.functions import col

In [38]:
cleansedData=cleansedData.select(col('selling_price').cast('int'),
                               col('year').cast('float'),
                               col('km_driven').cast('int'),
                               col('mileage').cast('float'),
                               col('engine').cast('float'),
                               col('max_power').cast('float'),
                                col('seats').cast('int'),
                                col('ownerLabel'),
                                col('regionLabel'),
                                col('brandLabel'),
                                col('fuelLabel'),
                                col('soldLabel'),
                                col('transmissionLabel'),
                                col('sellerLabel'),
                                col('stateLabel'))

In [39]:
cleansedData.dtypes

[('selling_price', 'int'),
 ('year', 'float'),
 ('km_driven', 'int'),
 ('mileage', 'float'),
 ('engine', 'float'),
 ('max_power', 'float'),
 ('seats', 'int'),
 ('ownerLabel', 'double'),
 ('regionLabel', 'double'),
 ('brandLabel', 'double'),
 ('fuelLabel', 'double'),
 ('soldLabel', 'double'),
 ('transmissionLabel', 'double'),
 ('sellerLabel', 'double'),
 ('stateLabel', 'double')]

In [40]:
from pyspark.sql.functions import to_timestamp,date_format


In [41]:
import findspark
from pyspark.sql import Row
from pyspark import SparkContext , SparkConf
import datetime
now = datetime.datetime.now()
#Getting Current date and time
current_year=now.strftime("%Y")
print (now.strftime("%Y"))

2021


In [42]:
type(current_year)

str

In [43]:
current_year=int(current_year)

In [44]:
type(current_year)

int

In [45]:
len=cleansedData.count()

In [46]:
cleansedData=cleansedData.withColumn("yearDiff",current_year-col('year'))


#### Task 1
Develop a PySpark application to load data Spark DataFrames and save it into Hive tables on a Hadoop cluster in an optimized format.

In [47]:
cleansedData.write.partitionBy('sold')

<pyspark.sql.readwriter.DataFrameWriter at 0x7faae43ca0b8>

In [48]:
cleansedData.createGlobalTempView('CarTempTable2')

In [49]:
cleansedData.write.saveAsTable('carTempTable2')

In [50]:
cleansedData.write.format('ORC')

<pyspark.sql.readwriter.DataFrameWriter at 0x7faae4426be0>

In [51]:
cleansedData.createGlobalTempView('carORCTable2')

In [52]:
cleansedData.write.saveAsTable('carORCTable2')

In [53]:
ct1=spark.sql('select * from carORCTable2')

In [54]:
ct1.printSchema()

root
 |-- selling_price: integer (nullable = true)
 |-- year: float (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- engine: float (nullable = true)
 |-- max_power: float (nullable = true)
 |-- seats: integer (nullable = true)
 |-- ownerLabel: double (nullable = true)
 |-- regionLabel: double (nullable = true)
 |-- brandLabel: double (nullable = true)
 |-- fuelLabel: double (nullable = true)
 |-- soldLabel: double (nullable = true)
 |-- transmissionLabel: double (nullable = true)
 |-- sellerLabel: double (nullable = true)
 |-- stateLabel: double (nullable = true)
 |-- yearDiff: float (nullable = true)



#### Task 2
Perform profiling of the data through PySpark and ensure that it is migrated correctly whereever the source is an RDBMS

In [55]:
ct1.describe()

DataFrame[summary: string, selling_price: string, year: string, km_driven: string, mileage: string, engine: string, max_power: string, seats: string, ownerLabel: string, regionLabel: string, brandLabel: string, fuelLabel: string, soldLabel: string, transmissionLabel: string, sellerLabel: string, stateLabel: string, yearDiff: string]

In [56]:
ct1.show()

+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|selling_price|  year|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|yearDiff|
+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|       450000|2014.0|   145500|   23.4|1248.0|     74.0|    5|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|     7.0|
|       370000|2014.0|   120000|  21.14|1498.0|   103.52|    5|       1.0|        2.0|      11.0|      0.0|      1.0|              0.0|        0.0|       2.0|     7.0|
|       158000|2006.0|   140000|   17.7|1497.0|     78.0|    5|       2.0|        0.0|       4.0|      1.0|      1.0|              0.0|        0.0|       3.0|  

In [57]:
ct2=spark.sql('select * from carTempTable2')

In [58]:
ct2.printSchema()

root
 |-- selling_price: integer (nullable = true)
 |-- year: float (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- engine: float (nullable = true)
 |-- max_power: float (nullable = true)
 |-- seats: integer (nullable = true)
 |-- ownerLabel: double (nullable = true)
 |-- regionLabel: double (nullable = true)
 |-- brandLabel: double (nullable = true)
 |-- fuelLabel: double (nullable = true)
 |-- soldLabel: double (nullable = true)
 |-- transmissionLabel: double (nullable = true)
 |-- sellerLabel: double (nullable = true)
 |-- stateLabel: double (nullable = true)
 |-- yearDiff: float (nullable = true)



In [59]:
ct2.describe()

DataFrame[summary: string, selling_price: string, year: string, km_driven: string, mileage: string, engine: string, max_power: string, seats: string, ownerLabel: string, regionLabel: string, brandLabel: string, fuelLabel: string, soldLabel: string, transmissionLabel: string, sellerLabel: string, stateLabel: string, yearDiff: string]

In [60]:
ct2.show()

+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|selling_price|  year|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|yearDiff|
+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|       450000|2014.0|   145500|   23.4|1248.0|     74.0|    5|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|     7.0|
|       370000|2014.0|   120000|  21.14|1498.0|   103.52|    5|       1.0|        2.0|      11.0|      0.0|      1.0|              0.0|        0.0|       2.0|     7.0|
|       158000|2006.0|   140000|   17.7|1497.0|     78.0|    5|       2.0|        0.0|       4.0|      1.0|      1.0|              0.0|        0.0|       3.0|  

In [61]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /usr/local/sqoop/lib/mysql-connector-java-5.1.47-bin.jar \
--jars /usr/local/sqoop/lib/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47-bin.jar pyspark-shell'

In [62]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Capstone PySpark MySQL JDBC").getOrCreate()

In [63]:
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
|  customerdb|
|     default|
| nyse_hduser|
+------------+



In [64]:
spark.sql('show tables from nyse_hduser').show()

+-----------+------------+-----------+
|   database|   tableName|isTemporary|
+-----------+------------+-----------+
|nyse_hduser|       json1|      false|
|nyse_hduser|       json2|      false|
|nyse_hduser|       json3|      false|
|nyse_hduser|     jsonstr|      false|
|nyse_hduser|     nydview|      false|
|nyse_hduser|   nysedaily|      false|
|nyse_hduser|nysedailyrc1|      false|
|nyse_hduser|        ssg1|      false|
|nyse_hduser|        ssg2|      false|
|nyse_hduser|   stugrades|      false|
+-----------+------------+-----------+



In [65]:
spark.sql('show tables from default').show()

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| default|      carorctable2|      false|
| default|     cartemptable2|      false|
| default|   titanicorctable|      false|
| default|  titanicorctable1|      false|
| default|  titanictemptable|      false|
| default| titanictemptable1|      false|
| default|titanictemptable12|      false|
+--------+------------------+-----------+



In [66]:
spark.sql('select * from cartemptable2')

DataFrame[selling_price: int, year: float, km_driven: int, mileage: float, engine: float, max_power: float, seats: int, ownerLabel: double, regionLabel: double, brandLabel: double, fuelLabel: double, soldLabel: double, transmissionLabel: double, sellerLabel: double, stateLabel: double, yearDiff: float]

In [67]:
spark.sql('select * from cartemptable2').show()

+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|selling_price|  year|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|yearDiff|
+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|       450000|2014.0|   145500|   23.4|1248.0|     74.0|    5|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|     7.0|
|       370000|2014.0|   120000|  21.14|1498.0|   103.52|    5|       1.0|        2.0|      11.0|      0.0|      1.0|              0.0|        0.0|       2.0|     7.0|
|       158000|2006.0|   140000|   17.7|1497.0|     78.0|    5|       2.0|        0.0|       4.0|      1.0|      1.0|              0.0|        0.0|       3.0|  

In [68]:
spark.sql('use default')

DataFrame[]

In [69]:
spark.sql('show tables').show()

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| default|      carorctable2|      false|
| default|     cartemptable2|      false|
| default|   titanicorctable|      false|
| default|  titanicorctable1|      false|
| default|  titanictemptable|      false|
| default| titanictemptable1|      false|
| default|titanictemptable12|      false|
+--------+------------------+-----------+



In [70]:
spark.sql('select * from cartemptable2').show()

+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|selling_price|  year|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|yearDiff|
+-------------+------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+--------+
|       450000|2014.0|   145500|   23.4|1248.0|     74.0|    5|       0.0|        2.0|       0.0|      0.0|      1.0|              0.0|        0.0|      33.0|     7.0|
|       370000|2014.0|   120000|  21.14|1498.0|   103.52|    5|       1.0|        2.0|      11.0|      0.0|      1.0|              0.0|        0.0|       2.0|     7.0|
|       158000|2006.0|   140000|   17.7|1497.0|     78.0|    5|       2.0|        0.0|       4.0|      1.0|      1.0|              0.0|        0.0|       3.0|  

In [71]:
spark.sql('select distinct(ownerLabel) from cartemptable2').show()

+----------+
|ownerLabel|
+----------+
|       0.0|
|       1.0|
|       4.0|
|       3.0|
|       2.0|
+----------+



In [72]:
spark.sql('select avg(selling_price) from cartemptable2').show()

+------------------+
|avg(selling_price)|
+------------------+
| 649675.2791228292|
+------------------+



In [73]:
spark.sql('select max(selling_price) from cartemptable2').show()

+------------------+
|max(selling_price)|
+------------------+
|          10000000|
+------------------+



In [74]:
spark.sql('select min(selling_price) from cartemptable2').show()

+------------------+
|min(selling_price)|
+------------------+
|             29999|
+------------------+



In [75]:
spark.sql('select ownerLabel,count(ownerLabel),avg(selling_price) from cartemptable2 group by ownerLabel').show()

+----------+-----------------+------------------+
|ownerLabel|count(ownerLabel)|avg(selling_price)|
+----------+-----------------+------------------+
|       0.0|             5205| 789938.7219980787|
|       1.0|             2012| 401765.0218687873|
|       4.0|                5|         4403800.0|
|       3.0|              159| 233720.0754716981|
|       2.0|              508|287648.64763779525|
+----------+-----------------+------------------+



In [76]:
spark.sql('select fuelLabel,count(fuelLabel), avg(selling_price) from cartemptable2 group by fuelLabel').show()

+---------+----------------+------------------+
|fuelLabel|count(fuelLabel)|avg(selling_price)|
+---------+----------------+------------------+
|      0.0|            4293| 801300.4344281388|
|      1.0|            3509|473466.37646053004|
|      3.0|              35| 210885.7142857143|
|      2.0|              52|317903.78846153844|
+---------+----------------+------------------+



In [77]:
spark.sql('select transmissionLabel,count(transmissionLabel), avg(selling_price) from cartemptable2 group by transmissionLabel').show()

+-----------------+------------------------+------------------+
|transmissionLabel|count(transmissionLabel)|avg(selling_price)|
+-----------------+------------------------+------------------+
|              0.0|                    6854|463176.72789611906|
|              1.0|                    1035| 1884710.129468599|
+-----------------+------------------------+------------------+



In [78]:
spark.sql('select brandLabel,count(brandLabel) from cartemptable2 group by brandLabel').show()

+----------+-----------------+
|brandLabel|count(brandLabel)|
+----------+-----------------+
|       8.0|              228|
|       0.0|             2367|
|       7.0|              230|
|      29.0|                1|
|      18.0|               40|
|       1.0|             1352|
|      25.0|                4|
|       4.0|              466|
|      23.0|                5|
|      11.0|              104|
|      21.0|               14|
|      14.0|               67|
|      22.0|                6|
|       3.0|              718|
|      19.0|               34|
|      28.0|                3|
|       2.0|              756|
|      17.0|               41|
|      27.0|                3|
|      10.0|              118|
+----------+-----------------+
only showing top 20 rows



In [79]:
spark.sql('select brandLabel,count(brandLabel) from cartemptable2 where soldLabel==1 group by brandLabel ').show()

+----------+-----------------+
|brandLabel|count(brandLabel)|
+----------+-----------------+
|       8.0|               56|
|       0.0|              597|
|       7.0|               56|
|      18.0|                9|
|       1.0|              330|
|      25.0|                2|
|       4.0|              124|
|      23.0|                1|
|      11.0|               20|
|      21.0|                3|
|      14.0|               19|
|      22.0|                1|
|       3.0|              177|
|      19.0|               11|
|      28.0|                2|
|       2.0|              201|
|      17.0|                6|
|      27.0|                1|
|      10.0|               29|
|      13.0|               21|
+----------+-----------------+
only showing top 20 rows



In [80]:
cleansedData.columns

['selling_price',
 'year',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel',
 'yearDiff']

#### Task 4
If the predictive model identified in task 2.4 available in Spark MLlib then develop a PySpark application to implement and evaluate the ML model identified with appropriate metrics. 

* Models built:
   1. Linear Regression
   2. Decision Tree
   3. Random Forest

In [81]:
required_features=[
 'yearDiff',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel']

In [82]:
from pyspark.ml.feature import VectorAssembler

In [83]:
assembler=VectorAssembler(inputCols=required_features,outputCol='features')

In [84]:
cleansedData.columns

['selling_price',
 'year',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel',
 'yearDiff']

In [85]:
cleansedDF=cleansedData.drop('year')

In [86]:
cleansedDF.columns

['selling_price',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel',
 'yearDiff']

In [87]:
cleansedData.columns

['selling_price',
 'year',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel',
 'yearDiff']

In [88]:
transformed_data=assembler.transform(cleansedDF)

In [89]:
(training_data, test_data)=transformed_data.randomSplit([0.8,0.2])

##### 1. Linear Regression

In [90]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler

In [91]:
vectorAssembler = VectorAssembler(inputCols=required_features, outputCol="unscaled_features")

In [92]:
lr_data = cleansedData.select(col("selling_price").alias("label"), *required_features)
lr_data.printSchema()

root
 |-- label: integer (nullable = true)
 |-- yearDiff: float (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- engine: float (nullable = true)
 |-- max_power: float (nullable = true)
 |-- seats: integer (nullable = true)
 |-- ownerLabel: double (nullable = true)
 |-- regionLabel: double (nullable = true)
 |-- brandLabel: double (nullable = true)
 |-- fuelLabel: double (nullable = true)
 |-- soldLabel: double (nullable = true)
 |-- transmissionLabel: double (nullable = true)
 |-- sellerLabel: double (nullable = true)
 |-- stateLabel: double (nullable = true)



In [93]:
va_data = vectorAssembler.transform(lr_data)
va_data.show(truncate=False)

+------+--------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+-----------------------------------------------------------------------------------------------+
|label |yearDiff|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|unscaled_features                                                                              |
+------+--------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+-----------------------------------------------------------------------------------------------+
|450000|7.0     |145500   |23.4   |1248.0|74.0     |5    |0.0       |2.0        |0.0       |0.0      |1.0      |0.0              |0.0        |33.0      |[7.0,145500.0,23.399999618530273,1248.0,74.0,5.0,0.0,2.0,0.0,0.0,1.0,0.0,0.0,33.0]             |


In [94]:
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

In [95]:
ss_model = standardScaler.fit(va_data)

In [96]:
ss_data = ss_model.transform(va_data)

In [97]:
ss_data.show(truncate=False)

+------+--------+---------+-------+------+---------+-----+----------+-----------+----------+---------+---------+-----------------+-----------+----------+-----------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label |yearDiff|km_driven|mileage|engine|max_power|seats|ownerLabel|regionLabel|brandLabel|fuelLabel|soldLabel|transmissionLabel|sellerLabel|stateLabel|unscaled_features                                                                              |features                                                                                                                                                                                                                     |
+------+--------+---------+-------+------+---------+----

In [98]:
(training, test) = ss_data.randomSplit([.7, .3])

In [99]:
training.describe().show()

+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+
|summary|            label|         yearDiff|        km_driven|           mileage|            engine|        max_power|             seats|        ownerLabel|       regionLabel|        brandLabel|         fuelLabel|          soldLabel|  transmissionLabel|        sellerLabel|        stateLabel|
+-------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+
|  count|             5559|             5559|             5559|              5559|              5559|             5559

In [100]:
test.describe().show()

+-------+-----------------+------------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+
|summary|            label|          yearDiff|        km_driven|           mileage|            engine|        max_power|             seats|         ownerLabel|       regionLabel|        brandLabel|         fuelLabel|          soldLabel| transmissionLabel|        sellerLabel|        stateLabel|
+-------+-----------------+------------------+-----------------+------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+-------------------+------------------+
|  count|             2330|              2330|             2330|              2330|              2330|             

In [101]:
lr = LinearRegression(maxIter=10, regParam=.01)

In [102]:
lr_model = lr.fit(training)
print(lr_model.coefficients)

[-94360.62736,-102422.797179,101703.354952,49506.1951596,460292.301877,-1232.14286768,-1562.7442609,5647.68026748,97176.8700042,1995.30141502,11545.5976398,136852.428435,34865.4164522,1086.30027345]


In [103]:
print(lr_model.intercept)
trainingSummary = lr_model.summary


-1034614.1704750326


In [104]:
trainingSummary.rootMeanSquaredError

455461.3311881192

In [105]:
trainingSummary.meanAbsoluteError

278488.5311851478

In [106]:
trainingSummary.meanSquaredError

207445024207.6536

In [107]:
trainingSummary.r2

0.690278357804297

In [108]:
prediction_df = lr_model.transform(test)

In [109]:
prediction_df.select("label","prediction").show(truncate=False)

+-----+-------------------+
|label|prediction         |
+-----+-------------------+
|29999|-799578.6037356788 |
|31504|-641082.6329321289 |
|39000|-629645.0286255758 |
|40000|-683143.4279925646 |
|45000|-595188.5737205544 |
|45000|-633147.8877338273 |
|50000|-565613.5392540431 |
|50000|-625573.948460699  |
|50000|-550212.8251220869 |
|52000|-754087.8636973328 |
|55000|-159575.66232210095|
|55000|-309884.5986604295 |
|55000|57697.750287044095 |
|55000|-149367.51072047127|
|55000|-329001.59182935045|
|55000|-828.1804531461094 |
|55599|-568395.9972450738 |
|58000|-465351.654005153  |
|59259|-230995.92948406178|
|60000|-52392.19556545734 |
+-----+-------------------+
only showing top 20 rows



In [110]:
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = eval.evaluate(prediction_df)
print("RMSE: %.3f" % rmse)
mse = eval.evaluate(prediction_df, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
mae = eval.evaluate(prediction_df, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
r2 = eval.evaluate(prediction_df, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 477892.647
MSE: 228381381595.950
MAE: 282419.310
r2: 0.644


Performance of Linear regression:
* r2 for training set is 0.69 and test set is 0.64.
* RMSE is around 455461 for train set and 477892 for test set


Seeing these stats, we can conclude that linear regression model performs average on car dataset.

##### 2. Decision Tree Regression

In [111]:
cleansedDF.columns

['selling_price',
 'km_driven',
 'mileage',
 'engine',
 'max_power',
 'seats',
 'ownerLabel',
 'regionLabel',
 'brandLabel',
 'fuelLabel',
 'soldLabel',
 'transmissionLabel',
 'sellerLabel',
 'stateLabel',
 'yearDiff']

In [112]:
dt_data = cleansedDF.select(col("selling_price").alias("label"), *required_features)
dt_data.printSchema()

root
 |-- label: integer (nullable = true)
 |-- yearDiff: float (nullable = true)
 |-- km_driven: integer (nullable = true)
 |-- mileage: float (nullable = true)
 |-- engine: float (nullable = true)
 |-- max_power: float (nullable = true)
 |-- seats: integer (nullable = true)
 |-- ownerLabel: double (nullable = true)
 |-- regionLabel: double (nullable = true)
 |-- brandLabel: double (nullable = true)
 |-- fuelLabel: double (nullable = true)
 |-- soldLabel: double (nullable = true)
 |-- transmissionLabel: double (nullable = true)
 |-- sellerLabel: double (nullable = true)
 |-- stateLabel: double (nullable = true)



In [113]:
vectorAssembler = VectorAssembler(inputCols=required_features, outputCol="unscaled_features")

In [114]:
dt_transformed=vectorAssembler.transform(cleansedDF)

In [115]:
(trainingData, testData) = dt_transformed.randomSplit([0.7, 0.3])

In [116]:
dt = DecisionTreeRegressor(featuresCol="unscaled_features",labelCol='selling_price',maxBins=100)

In [117]:
model = dt.fit(trainingData)

In [118]:
preds=model.transform(testData)

In [119]:
evaluator = RegressionEvaluator(
    labelCol="selling_price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(preds)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 205428


In [120]:
evaluator=RegressionEvaluator(
labelCol='selling_price',
predictionCol='prediction',
metricName='mae')
mae=evaluator.evaluate(preds)
print('MAE of test data is', mae)

MAE of test data is 129374.15253085553


In [121]:
evaluator=RegressionEvaluator(
labelCol='selling_price',
predictionCol='prediction',
metricName='r2')
r2=evaluator.evaluate(preds)
print('R2 of test data is',r2)

R2 of test data is 0.9317583787240841


Performance of Decision Tree regression:
* r2 for test set is 0.93.
* RMSE is around 205428 for test set


Seeing these stats, we can conclude that decision tree regression model has a good performance on car dataset.

##### 3. Random Forest Regression

In [122]:
 from pyspark.ml.regression import RandomForestRegressor

In [123]:
assembler=VectorAssembler(inputCols=required_features,outputCol='features')

In [124]:

randFrst=RandomForestRegressor(labelCol='selling_price', featuresCol='features',maxBins=200)

In [125]:
model=randFrst.fit(training_data)

In [126]:
preds=model.transform(test_data)

In [127]:
evaluator=RegressionEvaluator(
labelCol='selling_price',
predictionCol='prediction',
metricName='rmse')

In [128]:
rmse=evaluator.evaluate(preds)

In [129]:
print('Rmse of test data is', rmse)

Rmse of test data is 194118.31766748952


In [130]:
evaluator=RegressionEvaluator(
labelCol='selling_price',
predictionCol='prediction',
metricName='mae')

In [131]:
mae=evaluator.evaluate(preds)

In [132]:
print('MAE of test data is', mae)

MAE of test data is 125737.08459903157


In [133]:
evaluator=RegressionEvaluator(
labelCol='selling_price',
predictionCol='prediction',
metricName='r2')

In [134]:
r2=evaluator.evaluate(preds)

In [135]:
print('R2 of test data is',r2)

R2 of test data is 0.9416441337954653


Performance of Random Forest regression:
* r2 for test set is 0.94.
* RMSE is around 194118 for test set.


Seeing these stats, we can conclude that Random Forest regression model has a good performance on car dataset.