In [8]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 31 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 45.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=41c60228bbeb2af6e64b7aac618e0fb402e614b21da8a6ad56965b0b4457e8dc
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

In [12]:
import os
import pandas as pd

import pyspark

## Load data

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
cur_path = "/content/drive/MyDrive/Colab Notebooks/Big-data-scaling-team-project-group2/"
os.chdir(cur_path)
!pwd

/content/drive/MyDrive/Colab Notebooks/Big-data-scaling-team-project-group2


In [14]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('final').getOrCreate()

In [15]:
data = spark.read.csv('data/cleaned_data.csv',inferSchema=True,header=True)

In [16]:
data.show()

+---+----+---------+-------------------+--------------------+-----------+------------+-----+------------------+--------+------+--------+--------------------+-----+------------+--------+----------+
|_c0|year|     make|              model|                trim|       body|transmission|state|         condition|odometer| color|interior|              seller|  mmr|sellingprice|saledate|madeRegion|
+---+----+---------+-------------------+--------------------+-----------+------------+-----+------------------+--------+------+--------+--------------------+-----+------------+--------+----------+
|  0|2015|      Kia|            Sorento|                  LX|        suv|   automatic|   ca|               5.0| 16639.0| white|   black|kia motors americ...|20500|       21500|    2014|        5x|
|  1|2015|      Kia|            Sorento|                  LX|        suv|   automatic|   ca|               5.0|  9393.0| white|   beige|kia motors americ...|20800|       21500|    2014|        5x|
|  2|2014|     

In [17]:
data.cache()
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- trim: string (nullable = true)
 |-- body: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- state: string (nullable = true)
 |-- condition: double (nullable = true)
 |-- odometer: double (nullable = true)
 |-- color: string (nullable = true)
 |-- interior: string (nullable = true)
 |-- seller: string (nullable = true)
 |-- mmr: integer (nullable = true)
 |-- sellingprice: integer (nullable = true)
 |-- saledate: integer (nullable = true)
 |-- madeRegion: string (nullable = true)



- Create a model to predict selling prices for cars. We will split data, create a few models using 
MLlib, and then compare their accuracy. When we handle modeling, we will use a data pipeline 
from Spark. 
 


In [35]:
from pyspark.ml import Pipeline

from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer

In [32]:

# |-- state: string (nullable = true)
# |-- color: string (nullable = true)
# |-- interior: string (nullable = true)
# |-- seller: string (nullable = true)
# |-- sellingprice: integer (nullable = true)
# |-- madeRegion: string (nullable = true)

In [37]:
make_stringIndexer_l=StringIndexer(inputCol="make", outputCol="makeIndex")
model_stringIndexer_l=StringIndexer(inputCol="model", outputCol="modelIndex")
trim_stringIndexer_l=StringIndexer(inputCol="trim", outputCol="trimIndex")
body_stringIndexer_l=StringIndexer(inputCol="body", outputCol="bodyIndex")
transmission_stringIndexer_l=StringIndexer(inputCol="transmission", outputCol="transmissionIndex")

make_hotEncoder=OneHotEncoder(inputCol="makeIndex", outputCol="makeVector")
model_hotEncoder=OneHotEncoder(inputCol="modelIndex", outputCol="modelVector")
trim_hotEncoder=OneHotEncoder(inputCol="trimIndex", outputCol="trimVector")
body_hotEncoder=OneHotEncoder(inputCol="bodyIndex", outputCol="bodyVector")
transmission_hotEncoder=OneHotEncoder(inputCol="transmissionIndex", outputCol="transmissionVector")

assembler = VectorAssembler(inputCols=["year","makeVector","modelVector","trimVector","bodyVector","transmissionVector","condition","odometer","mmr","saledate"],outputCol='unNorm_features')

normalizer = Normalizer(inputCol="unNorm_features", outputCol="features", p=1.0)

pipeline = Pipeline(stages=[make_stringIndexer_l, model_stringIndexer_l,trim_stringIndexer_l,body_stringIndexer_l,transmission_stringIndexer_l,make_hotEncoder,model_hotEncoder,trim_hotEncoder,body_hotEncoder,transmission_hotEncoder,assembler,normalizer])

model=pipeline.fit(data)
clean_data=model.transform(data)
prepare_data=clean_data.select(["features","sellingprice"])

In [38]:
prepare_data.show()

+--------------------+------------+
|            features|sellingprice|
+--------------------+------------+
|(2369,[0,9,92,830...|       21500|
|(2369,[0,9,92,830...|       21500|
|(2369,[0,8,60,150...|       30000|
|(2369,[0,27,168,9...|       27750|
|(2369,[0,8,450,97...|       67000|
|(2369,[0,3,53,838...|       10900|
|(2369,[0,8,447,82...|       65000|
|(2369,[0,2,71,857...|        9800|
|(2369,[0,20,113,1...|       32250|
|(2369,[0,2,124,83...|       17500|
|(2369,[0,20,198,1...|       49750|
|(2369,[0,9,84,830...|       17700|
|(2369,[0,1,55,829...|       12000|
|(2369,[0,9,92,830...|       21500|
|(2369,[0,2,71,867...|       10600|
|(2369,[0,3,53,838...|       14100|
|(2369,[0,7,68,829...|        4200|
|(2369,[0,20,236,1...|       40000|
|(2369,[0,2,124,83...|       17000|
|(2369,[0,8,212,97...|       67200|
+--------------------+------------+
only showing top 20 rows



split train test

In [40]:
train, test = prepare_data.randomSplit([0.7, 0.3], seed=12345)

linear regression

In [39]:
from pyspark.ml.regression import LinearRegression

In [42]:
lr = LinearRegression(featuresCol='features',labelCol='sellingprice')
lrModel=lr.fit(train)

In [43]:
trainingSummary=lrModel.summary
trainingSummary.predictions.show()



+--------------------+------------+------------------+
|            features|sellingprice|        prediction|
+--------------------+------------+------------------+
|(2369,[0,1,54,828...|      1100.0| 6639.653438097168|
|(2369,[0,1,54,831...|     32900.0|22704.952660171584|
|(2369,[0,1,54,831...|     16500.0| 14756.04615785557|
|(2369,[0,1,54,831...|     35000.0|24835.128366274967|
|(2369,[0,1,54,831...|     34100.0|24297.780206756488|
|(2369,[0,1,54,831...|     38200.0|26986.304788328227|
|(2369,[0,1,54,831...|     38000.0|28273.611800343057|
|(2369,[0,1,54,831...|     39400.0|29187.467324948208|
|(2369,[0,1,54,831...|     37500.0|30361.811736945157|
|(2369,[0,1,54,831...|     37200.0|30893.695511301252|
|(2369,[0,1,54,831...|     37500.0|31311.375543799273|
|(2369,[0,1,54,831...|     35000.0| 31045.09378872145|
|(2369,[0,1,54,831...|     37500.0|  32087.3690650294|
|(2369,[0,1,54,831...|     38000.0| 32097.29030477384|
|(2369,[0,1,54,831...|     16400.0|32526.054398902426|
|(2369,[0,

evaluate

In [49]:
test_results = lrModel.evaluate(test)

In [50]:
test_results.residuals.show()
print("RMSE: {}".format(test_results.rootMeanSquaredError))



+-------------------+
|          residuals|
+-------------------+
|  8720.981913874264|
|  5574.711462419735|
|  4843.071746952126|
| 1384.3208506962474|
|  3254.248134382724|
| 3585.0672153859996|
| 1390.0147223799686|
|-6121.0688488848355|
| -3580.435778712119|
|-5507.6535243475155|
|-1870.0272529217764|
| -2423.447186637248|
| 177.23318536717488|
| -4258.203970480245|
| -5175.710256160506|
| -4634.240876838034|
| -4539.435085651439|
| -607.4443307321999|
| -2407.910816213531|
|-2728.3173092344405|
+-------------------+
only showing top 20 rows

RMSE: 3559.502608778501


- Find important factors to predict the car price. After selecting the final model having the highest 
accuracy, we will explore the importance of each attribute by using Spark.

In [48]:
print("Coefficients: {}".format(str(lrModel.coefficients))) # For each feature...
print(len(lrModel.coefficients))
print('\n')
print("Intercept:{}".format(str(lrModel.intercept)))

Coefficients: [-87845.45603156186,-30442640.112301424,-43684072.362098776,-78491526.37796564,-51004348.51116457,-53484597.32233624,-42061157.14240239,-77079696.3518302,84698784.05170798,-78999506.33664387,-75619517.54018945,32902450.58956069,-80843696.6035799,16445723.750922227,-47984632.49664132,70369866.33073246,17333733.06026358,-95145652.86228175,51552244.54429101,-71715294.69490318,24436530.07202342,17281227.639634598,13435362.876916237,-93450145.15965384,10592260.16297603,-2897167.276407977,-48662310.02665152,22418769.592540767,-70262974.50634494,-113258359.9341822,-51793901.072072804,284279909.4197354,-71154110.99631181,-174810819.19438878,576924378.8291109,-52411306.54883184,-101852343.64671823,180250342.59538186,-59547820.73526383,-51677968.68076685,-95913630.90165076,-73063630.02754484,195684368.78306895,2024047374.3899825,787662541.2295889,1080204734.2590709,-396379712.5609114,5677055901.966856,-507864875.6931954,8973638927.71915,214386384.82615927,2985329623.9075727,-124566