# Car price prediction using Apache spark and python wit AWS S3
by moses kiboma

# Problem Definition
#### How to determine the *price* of a used car?

### Findspark

Use `findspark` to be able to find and import **Pyspark** module, while correctly setting environmental variables and dependencies.

In [2]:
import traceback
import findspark
try:
    findspark.init('/opt/spark')
except:
    print ("Error:", ''.join(traceback.format_stack()))

Check paths before Executing PySpark Session:

In [3]:
import os
import sys
print("PATH: %s\n" % os.environ['PATH'])
print("SPARK_HOME: %s" % os.environ['SPARK_HOME'])
print("PYSPARK_PYTHON: %s" % os.environ['PYSPARK_PYTHON'])

PATH: /home/moses/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin

SPARK_HOME: /opt/spark
PYSPARK_PYTHON: /bin/python3


## Loading Packages 

In [4]:
#import libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import seaborn as sns
import subprocess
from pyspark.sql.functions import *
from functools import reduce
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline

### Package Versions

In [5]:
print('pandas: {}'.format(pd.__version__))
print('numpy: {}'.format(np.__version__))
print('matplotlib: {}'.format(matplotlib.__version__))
print('seaborn: {}'.format(sns.__version__))
print('Python: {}'.format(sys.version))

pandas: 1.4.1
numpy: 1.22.2
matplotlib: 3.5.1
seaborn: 0.11.2
Python: 3.8.10 (default, Mar 15 2022, 12:22:08) 
[GCC 9.4.0]


## Creating SparkSession
Get package to handle AWS to access S3:

In [6]:
%set_env PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


env: PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


Creating Spark Session, hosted across all local nodes on a **Standalone Cluster**:

In [7]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark Craigslist") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

22/04/06 21:28:06 WARN Utils: Your hostname, kiboma resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface wlo1)
22/04/06 21:28:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/moses/.ivy2/cache
The jars for the packages stored in: /home/moses/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-36b5e920-74c0-4982-a4a1-42659c5aed7d;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.3 in central
	found org.apache.hadoop#hadoop-common;2.7.3 in central
	found org.apache.hadoop#hadoop-annotations;2.7.3 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-collectio

RuntimeError: Java gateway process exited before sending its port number

Configure Hadoop connection for S3:

In [8]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", "AKIAQQADETG4J56SFQIV")
hadoopConf.set("fs.s3a.secret.key", "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")

Monitoring Spark instrumentation through the WebUI available through `localhost:4040/`

# Extract Stage

## Kaggle Dataset

https://www.truecar.com/used-cars-for-sale/listings/

In [9]:
carsdata = spark.read.format("csv").option("header", "true").load("cars.csv")
type(carsdata)

                                                                                

pyspark.sql.dataframe.DataFrame

## Validating Data

Now that the data is available as a local *dataframe* on the Spark cluster, let's validate the dataframe by look at the schema, size, samples and statistics of our working data - 

In [20]:
carsdata.printSchema()

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- posting_date: string (nu

Dimensions of Raw Dataset:

In [21]:
print(carsdata.count(),len(carsdata.columns))

                                                                                

441802 26


Collecting random sample to see what kind of data populates each column:

In [22]:
carsdata.sample(False,0.0001,10).toPandas().sample(10)

22/04/05 15:20:53 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,id,url,region,region_url,price,year,manufacturer,model,condition,cylinders,...,size,type,paint_color,image_url,description,county,state,lat,long,posting_date
13,7309153014,https://omaha.craigslist.org/ctd/d/council-blu...,omaha / council bluffs,https://omaha.craigslist.org,7495,2010,honda,civic sedan,,4 cylinders,...,,sedan,black,https://images.craigslist.org/00o0o_fspI4HDewO...,2010 *Honda* *Civic Sedan* 4dr Automatic LX-S ...,,ia,41.229172,-95.852118,2021-04-19T07:21:13-0500
35,7314502745,https://nashville.craigslist.org/ctd/d/white-h...,nashville,https://nashville.craigslist.org,32900,2019,ford,transit,like new,6 cylinders,...,full-size,van,white,https://images.craigslist.org/00x0x_lKicuXbjwb...,-See more vans at valuecargovans.com Price: $3...,,tn,36.4641,-86.65828,2021-04-29T15:25:16-0500
37,7316143261,https://ogden.craigslist.org/ctd/d/atlanta-201...,ogden-clearfield,https://ogden.craigslist.org,38990,2013,chevrolet,corvette grand sport,good,8 cylinders,...,,other,red,https://images.craigslist.org/00P0P_fxBYwOMmMp...,Carvana is the safer way to buy a car During t...,,ut,33.779214,-84.411811,2021-05-03T07:14:49-0600
47,7307668314,https://wyoming.craigslist.org/ctd/d/evans-201...,wyoming,https://wyoming.craigslist.org,39999,2015,ram,3500,,,...,,pickup,,https://images.craigslist.org/00r0r_BysOaosqN6...,"""2015 RAM 3500 4WD Crew Cab 169"""" Laramie ...",Inc — (970) 456-4813 or direct +19703306261 —...,999 2015 Gray 3500 Ram. 6.7 Litter CUMMINS...,Inc Year: 2015 Make: RAM Model: 3500 Serie...,371 Exterior: Gray Interior: Black Body: Truc...,CO 80620 Phone: (970) 456-4813 Direct: ...
16,7316473997,https://baltimore.craigslist.org/ctd/d/baltimo...,baltimore,https://baltimore.craigslist.org,21590,2012,jaguar,xf portfolio sedan 4d,good,8 cylinders,...,,sedan,blue,https://images.craigslist.org/00R0R_lwWjXSEWNa...,Carvana is the safer way to buy a car During t...,,md,39.3,-76.61,2021-05-03T18:30:53-0400
18,7313883787,https://detroit.craigslist.org/mcb/ctd/d/harri...,detroit metro,https://detroit.craigslist.org,28590,2019,fiat,124 spider abarth,good,,...,,other,black,https://images.craigslist.org/00U0U_iSQWhP2Gt0...,Carvana is the safer way to buy a car During t...,,mi,42.58,-82.81,2021-04-28T12:41:17-0400
40,7304801711,https://bellingham.craigslist.org/ctd/d/bellin...,bellingham,https://bellingham.craigslist.org,37499,2016,ram,1500,,,...,,pickup,black,https://images.craigslist.org/00B0B_kD6FJIqRiF...,2016 Ram 1500 Laramie Longhorn Offered by:...,,wa,48.756909,-122.452132,2021-04-10T11:29:04-0700
32,7305260762,https://columbia.craigslist.org/ctd/d/mercedes...,columbia,https://columbia.craigslist.org,26998,2016,mercedes-benz,metris,,4 cylinders,...,,van,brown,https://images.craigslist.org/00A0A_hkj7yNP1AD...,"""2016 Mercedes Benz Metris Conversion Van High...",Dual Sliding Doors in the back. Rare Mini Va...,540[C03] Active Safety Plus PackageBlind Spot ...,680[963] Indium Grey Metallic\t$990[HH4] Therm...,Right\t$740[T56] Electrical Operation Of Slid...,Left\t$740[W65] Tailgate\t$450Original Shippi...
33,7303344920,https://greenville.craigslist.org/ctd/d/greenv...,greenville / upstate,https://greenville.craigslist.org,49990,2020,gmc,yukon slt sport utility 4d,good,8 cylinders,...,,SUV,black,https://images.craigslist.org/00X0X_3bMHmcFfx4...,Carvana is the safer way to buy a car During t...,,sc,34.83,-82.37,2021-04-07T17:10:46-0400
10,7306385685,https://pullman.craigslist.org/ctd/d/pullman-2...,pullman / moscow,https://pullman.craigslist.org,14988,2014,nissan,juke cvt nismo rs,like new,4 cylinders,...,compact,SUV,silver,https://images.craigslist.org/01717_gIUfTwAqG0...,2014 Nissan Juke CVT NISMO RS *ENGINE: 1.6L I...,,id,46.715081,-117.179896,2021-04-13T14:22:19-0700


Basic statistics on raw dataset columns:

In [23]:
carsdata.select("region","state","year","lat","long").describe().show()
carsdata.select("manufacturer","model","price","condition","odometer").describe().show()
carsdata.select("cylinders","fuel","transmission","drive","size","type").describe().show()
carsdata.select("url","title_status","vin","paint_color","image_url").describe().show()

                                                                                

+-------+--------------------+--------------------+--------------------+--------------------+---------+
|summary|              region|               state|                year|                 lat|     long|
+-------+--------------------+--------------------+--------------------+--------------------+---------+
|  count|              434901|              418725|              433912|              416405|   417087|
|   mean|   1427.686274509804|  1697.5362095531586|  2005.5686682071973|   41.77550324222822| Infinity|
| stddev|   808.3057161506485|   737.2614799468287|  112.05149237025323|   84.63443683901487|      NaN|
|    min|               1500 |                 ...| $489 Doc charge ...|                    |         |
|    max|zanesville / camb...|           blindspot|                  wa|🌟  Color: Sandal...|” however|
+-------+--------------------+--------------------+--------------------+--------------------+---------+



                                                                                

+-------+-----------------+--------------------+--------------------+-----------------+-----------------+
|summary|     manufacturer|               model|               price|        condition|         odometer|
+-------+-----------------+--------------------+--------------------+-----------------+-----------------+
|  count|           412865|              424296|              435356|           254659|           424001|
|   mean|744.1736842578244|  1914.5716864965727|    74186.5376440274|549.0233284319527|98034.05423732463|
| stddev|1029.712929811658|   5327.649155487644|1.2099967417236006E7|959.2700455051272|213873.5016305289|
|    min|             2010|                2007|                    |             2006|     BMW 3 Series|
|    max|            volvo|🔥GMC Sierra 1500...|                  wa|          salvage|               wa|
+-------+-----------------+--------------------+--------------------+-----------------+-----------------+



                                                                                

+-------+---------+------------------+------------------+------+------------------+------------------+
|summary|cylinders|              fuel|      transmission| drive|              size|              type|
+-------+---------+------------------+------------------+------+------------------+------------------+
|  count|   251004|            425458|            425870|297624|            121805|            334910|
|   mean|   2005.0|1797.1560693641618| 368.7826138090185| 545.0| 424.0317873538461|479.85442769230747|
| stddev|      0.0|248.64087282720493|173.93991068794728|   0.0|203.03621112166982|316.42065528497534|
|    min|     2005|              1500|              250 |   545|               530|               645|
|    max|    other|                wa|             other|   rwd|       sub-compact|             wagon|
+-------+---------+------------------+------------------+------+------------------+------------------+



                                                                                

+-------+--------------------+-----------------+-----------------+------------------+--------------------+
|summary|                 url|     title_status|              vin|       paint_color|           image_url|
+-------+--------------------+-----------------+-----------------+------------------+--------------------+
|  count|              431918|           420184|           267316|            297963|              428070|
|   mean|  1406.3489361702127| 322.347487744898|         Infinity|1054.8554913294797|   466.3659410647941|
| stddev|   919.4004721287523|214.1420067328583|              NaN| 372.4679741756739|   815.1995906348874|
|    min|      2005 Subaru...|             150 |             350 |             1500 |               2500 |
|    max|https://zanesvill...|          salvage|ZPBUA1ZL1KLA02237|            yellow|https://images.cr...|
+-------+--------------------+-----------------+-----------------+------------------+--------------------+



Generally, the `.describe()` or `.explain()` method is a good way to start exploring a dataset. For this dataset, there are too many columns, some very unclean, and it is hard to decipher much from the above results. <br>

For this project, we will perform cleaning twice. The Extract stage has a basic cleaning section to remove duplicates and to obtain numerical features (notice in the schema above, all columns are *String* by default). We can cache the Extracted Data in a compressed format such as parquet, so it can be used for different pipelines. <br>

The main cleaning section is performed during the [Transform Stage](#Transform-Data).<br>



## Cleaning Data (Basic)

The goal of **Extract Stage** cleaning is to find a balance between cleaning the data and maintaining the maximum amount of raw data. This optimizes usability for other pipelines. <br>

### Removing Duplicates

The scraper can't find duplicate recors because of various reasons. Each duplicate is more likely the result of people posting multiple times or other errors. Therefore no record from the raw dataset can be found to be a complete duplicate.

However, duplicates can be removed by excluding `url` when calling .drop_duplicates() on **`vehicle_listings`**.

In [24]:
print(carsdata.columns)

['id', 'url', 'region', 'region_url', 'price', 'year', 'manufacturer', 'model', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'VIN', 'drive', 'size', 'type', 'paint_color', 'image_url', 'description', 'county', 'state', 'lat', 'long', 'posting_date']


In [25]:
clean_carsdata=carsdata.drop_duplicates(['region', 'price', 'year', 'manufacturer', 'model', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'vin', 'drive', 'size', 'type', 'paint_color', 'image_url', 'lat', 'long'])
                                                   

### Obtain Numerical Columns
Convert *numerical feature types* from string to float to obtain **Numerical** data columns.

In [28]:


cols=["price","year","odometer"]

clean_carsdara = (reduce(
            lambda memo_df, col_name: memo_df.withColumn(col_name, carsdata[col_name].cast("float")),
            cols,
            clean_carsdata))

clean_carsdata.printSchema()

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- year: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- condition: string (nullable = true)
 |-- cylinders: string (nullable = true)
 |-- fuel: string (nullable = true)
 |-- odometer: string (nullable = true)
 |-- title_status: string (nullable = true)
 |-- transmission: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- drive: string (nullable = true)
 |-- size: string (nullable = true)
 |-- type: string (nullable = true)
 |-- paint_color: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- posting_date: string (nu

## Caching Extract Data on S3

After cleaning, the final task in the **Extract** stage is to cache the data. I will store the cleaned dataset as a parquet file on Amazon S3. We can return to this file without having to run the Extract pipeline again, and attach other pipelines from this point as well. <br>

<div class="alert alert-warning">
<b>NOTE</b> This implementation uses .coalesce(4) before .write() to optimize read/write performance on S3.
</div>

In [29]:
#let us save data as parquet 
clean_carsdata.coalesce(4).write.parquet("cardata_extraction.parquet",mode='overwrite')

                                                                                

Unable to connect to S3 directly using SparkSession due to Spark generating its credentials, possibly due to versioning... <br>
Instead, we can connect to S3 using a bash script.<br>
The parquet compression brings the file size down to *<30MB*, so this doesn't cause performance issues here.<br>

In [31]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./kiboma.command','write_extract'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.
S3 Bucket Contents
---------------------------------
---------------------------------
---------------------------------
Writing files to S3 Bucket...
upload: cardata_extraction.parquet/._SUCCESS.crc to s3://kiboma/cardata/extract/._SUCCESS.crc
upload: cardata_extraction.parquet/_SUCCESS to s3://kiboma/cardata/extract/_SUCCESS
upload: cardata_extraction.parquet/.part-00003-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to s3://kiboma/cardata/extract/.part-00003-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc
upload: cardata_extraction.parquet/.part-00002-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to s3://kiboma/cardata/extract/.part-00002-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc
upload: cardata_extraction.parquet/.part-00001-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to s3://kiboma/cardata/extract/.part-00001-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.p

This completes the **Extract Stage**. <br>

We clean, sample and explore the data in the **Transform Stage**. The data available on S3 is clean but as close to the raw data as possible. <br>

# Transform Stage

*First we obtain the working dataset from the cached source on S3.* <br>

In [32]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./kiboma.command','read_extract'])
print ("Object Read from S3 to ..Home Dir.")

Connecting to AWS S3...
Object Read from S3 to ..Home Dir.
S3 Bucket Contents
---------------------------------
                           PRE cardata/
---------------------------------
---------------------------------
Reading files from S3 Bucket...
download: s3://kiboma/cardata/extract/._SUCCESS.crc to cardata_extrct_dowload.parquet/._SUCCESS.crc
download: s3://kiboma/cardata/extract/_SUCCESS to cardata_extrct_dowload.parquet/_SUCCESS
download: s3://kiboma/cardata/extract/.part-00003-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to cardata_extrct_dowload.parquet/.part-00003-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc
download: s3://kiboma/cardata/extract/.part-00001-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to cardata_extrct_dowload.parquet/.part-00001-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc
download: s3://kiboma/cardata/extract/.part-00000-61966c59-cdbf-426c-87ec-e1a037471778-c000.snappy.parquet.crc to cardata_

In [34]:
#vehicles_listings_read=spark.read.parquet("s3a://s3://kiboma/usedcars/extract")
#vehicle_listings_read=spark.read.parquet("S3_cardata.parquet")

clean_carsdata = spark.read.format("parquet").option("inferschema","true").load("cardata_extrct_dowload.parquet")
type(clean_carsdata)


                                                                                

pyspark.sql.dataframe.DataFrame

In [35]:
print(clean_carsdata.count(),len(clean_carsdata.columns))

[Stage 22:>                                                         (0 + 4) / 5]

382307 26


                                                                                

There are over 1.5 million records, and 26 columns. The dataset is too large to perform EDA (Exploratory Data Analysis) comfortably. <br>
The final data after *Cleaning* should be as clean as possible without redundant data.<br>

## Cleaning Data

Cleaning is essential during the **Transform Stage** of the ETL pipeline. The main goal here is to optimize the dataset for the endgoal of the project - Creating a machine learning pipeline to predict the target varibale **Price**. <br>

We can also cache the cleaned data set before sampling and exploration for other machine learning project using the same dataset.<br>

### Cleaning Checkpoints

+  General
    +  Drop Columns
    +  Drop Rows (Null Price)
+  Numerical 
    +  Apply Reasonable Ranges
        +  Kept Odometer Null Values
+  Ordinal
    +  Narrow down relevant options
    +  Remove errors in columns
    +  Keep countable ordinal values
+  Categorical
    +  State_code + State_name (Extract Stage)
        +  Removed State_name

#### Drop Columns

Some Columns are redundant, we can remove them before moving forward.<br>

In [36]:
clean_carsdata=clean_carsdata.drop('url','region_url','vin', 'paint_color', 'image_url', \
                                       'lat', 'long', 'id', 'posting_date','description')
print(clean_carsdata.columns)

['region', 'price', 'year', 'manufacturer', 'model', 'condition', 'cylinders', 'fuel', 'odometer', 'title_status', 'transmission', 'drive', 'size', 'type', 'county', 'state']


'lat' and 'long' provide no more useful information than ('city','county_name', 'state_code', 'state_name'), and are therefore not needed.<br>
The other dropped columns are not helpful to the analysis.

The remaining attribute characteristics are varied, we have three types of features available in our data -
**Numerical**, **Ordinal**, **Categorical**<br>

**Potential Column Types -** <br>
Numerical - price, year, odometer <br>
Ordinal - condition, cylinders, fuel, title_status, transmission, drive, size, type <br>
Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

The target variable of this project is **Price**. Therefore, any records where price is null would be irrelevant. Check to see if the dataset contains null prices.

In [37]:
clean_carsdata.filter(clean_carsdata.price.isNull()).count()

[Stage 25:>                                                         (0 + 4) / 5]                                                                                

51

### Numerical Data Columns

In [38]:
#This is numerial data in the database
Num=["price","manufacturer","model","region","odometer","year"]


#### Applying a Reasonable Range on Continuous Types

Website postings that advertise items for sale often manipulate search algorithms to get more hits. For example, thousands of posts are spammed with a price of $1 (minimum allowed), to get more visibility through the 'search by price' filter. To ensure that our working dataset is within reasonable boundaries, we apply restrictions on the *Continuous* data columns.

In [39]:
print("New data: %d" %clean_carsdata.where("year >= 2019").count())
print("Old data: %d" %clean_carsdata.where("year < 1930").count())
print("Expensive cars: %d" %clean_carsdata.where("price > 300000").count())
print("Cheap cars: %d" %clean_carsdata.where("price < 50").count())
print("Many miles: %d" %clean_carsdata.where("odometer > 250000").count())
print("few miles: %d" %clean_carsdata.where("odometer < 50").count())


New data: 43169
Old data: 1404


                                                                                

Expensive cars: 56
Cheap cars: 34063




Many miles: 6465
few miles: 5484


Before removing 20k+ records due to price, let's confirm if the spame hypothesis is true:

In [40]:
clean_carsdata.sample(False,0.001,100).where("price<50").select(Num).show()

[Stage 46:>                                                         (0 + 1) / 1]                                                                                

+----------+-------------+--------------------+--------------------+--------+--------------------+
|     price| manufacturer|               model|              region|odometer|                year|
+----------+-------------+--------------------+--------------------+--------+--------------------+
| -77.49321|         null|                null|                  va|    null|2021-04-23T06:02:...|
|         0|         null|                 all|         san antonio|   12345|                2020|
|         0|      hyundai|              tucson|         long island|   69495|                2017|
|         0|    chevrolet|    silverado 2500hd|          charleston|   86534|                2016|
|         0|mercedes-benz|             m-class|raleigh / durham ...|   94809|                2013|
|         0|       toyota|         prius+prime|         long island|   70179|                2017|
|         0|        honda|              accord|       winston-salem|  133837|                2003|
|         

As expected, these randomly obtained results show us that "price<50" is not very useful to the study. <br>


In [42]:
print("Mising values of odometer :",clean_carsdata.filter(clean_carsdata.odometer.isNull()).count())
print("Missing values price :",clean_carsdata.filter(clean_carsdata.price.isNull()).count())
print("Missing values of  year :",clean_carsdata.filter(clean_carsdata.year.isNull()).count())

Mising values of odometer : 9813
Missing values price : 51
Missing values of  year : 1239


The Null counts shows us that a lot of listings have not reported Odometer values, perhaps to increase chances of selling a car or maybe due to lack of info. The question is, do we remove all these records altogether?<br>
I would rather keep these records because the 50k+ rows will still provide useful information about other attributes. Removing these values would reduce the dataset by 35%, which is too high.<br>
<br>Apply the restrictions to vehicle_listings_clean.<br>
Note- The Spark environment allows me to use a direct SQL query to restrict odometer values. This is better suited because the **`pyspark.sql.dataframe`** library treats "null" values as false when evaluating the `.where()` function.

In [45]:
clean_carsdata = clean_carsdata.where("price>=50 and price<=300000").where("year>=1930 and year<2019") 

clean_carsdata.registerTempTable("df_temp")

clean_carsdata=spark.sql("SELECT * FROM df_temp WHERE (odometer>=50 AND odometer<=250000) OR (odometer IS NULL)")

main=carsdata.count()
recent=clean_carsdata.count()

print("Number of original dataset ",main)
print("range ",recent)
print(" %d percent number of data stored" % (100 * recent / main))

clean_carsdata.select("price","year","odometer").describe().show()


                                                                                

Number of original dataset  441802
range  298061
 67 percent number of data stored




+-------+------------------+------------------+------------------+
|summary|             price|              year|          odometer|
+-------+------------------+------------------+------------------+
|  count|            298061|            298061|            296406|
|   mean|  17268.8696911035|2010.1089609173962| 98653.14427508214|
| stddev|13711.733852783767| 9.415662197252933|56955.964519133726|
|    min|               100|              1930|               100|
|    max|             99999|              2018|             99999|
+-------+------------------+------------------+------------------+



                                                                                

Although only 3 attributes, **price, year, odometer**, can be directly converted to numerical value, other attributes can be be transformed to fit the Categorical and Ordinal labels. The difference between categorical and ordinal values is that ordinal values have a clear and restricted ordering of types. For example, *'condition'* would be ordinal, ranging from excellent to poor.

### Ordinal Data Columns

Ordinal data will play an important role in learning about price. An ordinal data type is a variable that has a limited number of options, which can be ordered. For example, `"condition"` should range from excellent to bad (or in this case, "salvage".

The following columns are of the **Ordinal** types -  <br>
(condition, cylinders, fuel, title_status, transmission, drive, size, type) <br>

**`ordinals_options_all`** is a map linking each ordinal to each distinct value provided for it. This will help cleaning the dataset faster.

In [14]:
ordcolumns=clean_carsdata['condition', 'cylinders', 'fuel', 'title_status', 'transmission', 'drive', 'size', 'type']
print(ordcolumns)

DataFrame[condition: string, cylinders: string, fuel: string, title_status: string, transmission: string, drive: string, size: string, type: string]


As expected, all 8 Ordinal columns have values that seem to be errors, or reported incorrectly. For example, let's look into the "Condition" reported as 'like new"'. 

In [54]:
clean_carsdata.filter("condition = 'like new\"'" ).select(Num+["condition"]).show()

+-----+------------+-----+------+--------+----+---------+
|price|manufacturer|model|region|odometer|year|condition|
+-----+------------+-----+------+--------+----+---------+
+-----+------------+-----+------+--------+----+---------+



Only 1 record fetched. 

The next task is to clean these misleading values for each of the ordinal types. Automating this process, at the very least requires two `pyspark.sql.dataframe` library calls for each distinct ordinal option - `count()` and `replace()`. We can count the number of occurences of a value in an ordinal column, if it's less than 10 then it is  irrelevant and will be grouped under "other".

**`remove_fakes()`** is a function that takes in a column and all distinct values within the column, and evaluates which options are useful. It bunches up other options under "Other".

Time Cost - Each call to **`remove_fakes()`** will take **O(n)** *(O(2n+1))* where *n* is the length of the column. Because the algorithm needs to call both `count()` and `replace()` for any ordinal/value combination, this is the fastest way to clean the ordinal columns.

The above function does not count *null* values. The difference between values printed as a list after the clean versus the final `count()` value are *null* and, in some cases, *other*.

#### Cleaning column = 'condition'

In [59]:
print(clean_carsdata.select("condition").distinct().count())

7


#### Cleaning column = 'cylinders'

In [60]:
print("Number for cylinders",clean_carsdata.select("cylinders").distinct().count())

Number for cylinders 9


#### Cleaning column = 'fuel'

In [61]:
print("Number for fuel",clean_carsdata.select("fuel").distinct().count())

Number for fuel 6


#### Cleaning column = 'title_status'

In [62]:
print("Number for 'title_status'-",clean_carsdata.select("title_status").distinct().count())



Number for 'title_status'- 7


                                                                                

#### Cleaning column = 'transmission'

In [None]:
print("Number for 'transmission'-",clean_carsdata.select("transmission").distinct().count())

#### Cleaning column = 'drive'

In [63]:
print("Number for 'drive'-",clean_carsdata.select("drive").distinct().count())



Number for 'drive'- 4


                                                                                

#### Cleaning column = 'size'

In [64]:
print("Number for 'size'-",clean_carsdata.select("size").distinct().count())



Number for 'size'- 5


                                                                                

#### Cleaning column = 'type'

In [65]:

print("Number for 'type'-",clean_carsdata.select("type").distinct().count())

[Stage 138:>                                                        (0 + 4) / 5]

Number for 'type'- 14


                                                                                

### Categorical Data Columns

Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

During the **Transform** stage, categorical columns will help with *Stratified Sampling* before beginning with Exploratory Data Analysis... 

There isn't much cleaning to be done with categorical data. An easy observation is that this set of columns give us insight about location unlike other column types. There might be a way to combine state_code and state_name.

In [66]:
clean_carsdata.select("region","county","state").sample(False,0.0001,100).show()



+--------------------+--------------------+--------------------+
|              region|              county|               state|
+--------------------+--------------------+--------------------+
|saginaw-midland-b...|                null|                  mi|
|            portland|                null|                  or|
|        jacksonville|                null|                  nc|
|dayton / springfield|                null|                  oh|
|             buffalo|                null|                  ny|
|          las cruces|                null|                  nm|
|         los angeles| CA today! We wan...| CA! We stock man...|
|          sacramento|                null|                  ca|
|             boulder|                null|                  co|
|           catskills|                null|                  ny|
|             phoenix|                null|                  az|
|    champaign urbana|                null|                  il|
|         springfield|   

                                                                                

We can see that when state_code is *null*, the matching state_name is *Failed*. Since the state_name appears to be generated by state_code, there isn't any point in keeping both. Let's remove state_name. Our total columns are now 16.

In [67]:
print("Original Dimensions - ",carsdata.count(),"*",len(carsdata.columns))
print("New Dimensions - ",clean_carsdata.count(),"*",len(clean_carsdata.columns))

                                                                                

Original Dimensions -  441802 * 26


                                                                                

New Dimensions -  298061 * 16


#### Final dataset size:

In [68]:
old_col=carsdata.count()
old_row=len(carsdata.columns)
new_col=clean_carsdata.count()
new_row=len(clean_carsdata.columns)

print("Original Dimensions - ",old_col,"*",old_row)
print("New Dimensions - ",new_col,"*",new_row)


print("Data kept for analysis: %d percent of records" % (100 * new_col / old_col))





Original Dimensions -  441802 * 26
New Dimensions -  298061 * 16
Data kept for analysis: 67 percent of records




The used cars dataset is now cleaned and ready for analysis. We can cache this version on S3.

In [70]:
clean_carsdata.coalesce(4).write.parquet("clean_carsdata_transform.parquet",mode='overwrite')

                                                                                

In [72]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./kiboma.command','write_transform'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.
S3 Bucket Contents
---------------------------------
                           PRE cardata/
---------------------------------
---------------------------------
Writing files to S3 Bucket...
upload: clean_carsdata_transform.parquet/_SUCCESS to s3://kiboma/cardata/transform/_SUCCESS
upload: clean_carsdata_transform.parquet/._SUCCESS.crc to s3://kiboma/cardata/transform/._SUCCESS.crc
upload: clean_carsdata_transform.parquet/.part-00003-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to s3://kiboma/cardata/transform/.part-00003-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc
upload: clean_carsdata_transform.parquet/.part-00000-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to s3://kiboma/cardata/transform/.part-00000-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc
upload: clean_carsdata_transform.parquet/.part-00001-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to s3://kiboma/

## Feature Engineering

*First we obtain the working dataset from the cached source on S3.* <br>

In [73]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./kiboma.command','read_transform'])
print ("Object Read from S3 to ..homedir.")

Connecting to AWS S3...
Object Read from S3 to ..homedir.
S3 Bucket Contents
---------------------------------
                           PRE cardata/
---------------------------------
---------------------------------
Reading files from S3 Bucket...
download: s3://kiboma/cardata/transform/.part-00003-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to cardata_transform.parquet/.part-00003-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc
download: s3://kiboma/cardata/transform/._SUCCESS.crc to cardata_transform.parquet/._SUCCESS.crc
download: s3://kiboma/cardata/transform/_SUCCESS to cardata_transform.parquet/_SUCCESS
download: s3://kiboma/cardata/transform/.part-00002-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to cardata_transform.parquet/.part-00002-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc
download: s3://kiboma/cardata/transform/.part-00000-14381258-cadc-41f3-8829-78b027bd05dd-c000.snappy.parquet.crc to cardata_transform.p

In [10]:
clean_carsdata = spark.read.format("parquet").option("inferschema","true").load("cardata_transform.parquet")
type(clean_carsdata)


                                                                                

pyspark.sql.dataframe.DataFrame

In [76]:
print(clean_carsdata.count(),len(clean_carsdata.columns))

298061 16


In the section, we will obtain features from the ordinal columns, in order to use the information for machine learning.

In [78]:
print(ordcolumns)

DataFrame[condition: string, cylinders: string, fuel: string, title_status: string, transmission: string, drive: string, size: string, type: string]


In [80]:
clean_carsdata = clean_carsdata.fillna({'condition':'other'})
clean_carsdata= clean_carsdata.fillna({'cylinders':'other'})
clean_carsdata = clean_carsdata.fillna({'fuel':'other'})
clean_carsdata = clean_carsdata.fillna({'title_status':'other'})
clean_carsdata = clean_carsdata.fillna({'transmission':'other'})
clean_carsdata = clean_carsdata.fillna({'drive':'other'})
clean_carsdata = clean_carsdata.fillna({'size':'other'})
clean_carsdata = clean_carsdata.fillna({'type':'other'})

### String Indexer

Using `StringIndexer`, convert ordinal column names to numerical values. It is import to retain the original columns track which index is assigned to which unique column option.

In [11]:
pipeline = Pipeline(stages=[
    StringIndexer(inputCol=c, outputCol='{}_index'.format(c))
    for c in clean_carsdata.columns
])

clean_carsdata_features = pipeline.fit(clean_carsdata)

                                                                                

In [None]:
print(clean_carsdata_features.count(),len(clean_carsdata_features.columns))

### One Hot Encoding

Using `OneHotEncoder`, we can encode the ordinal column indexes.

One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.

In [55]:
encorder = Pipeline(stages=[
   OneHotEncoder(inputCol=c, outputCol='{}_index'.format(c))
    for c in clean_carsdata.columns
])

The new 8 columms of `vehicle_listings_encoded` are of type `vector`. This type can be recognized by Machine Learning models, so there is not need to extract each vector into separate columns.

In [None]:
print(clean_carsdata_encoded.count(),len(clean_carsdata_encoded.columns))

## Sampling Data

<br>To explore the dataset for analysis, create a **sample pandas dataframe** from the engineered data:

In [57]:
ex_df=clean_carsdata.sample(False,0.001,63).toPandas()



With a small random sample of the Feature Engineered Data, we can take a look at what we're working with.

In [58]:
ex_df.sample(n=10,replace=False,random_state=63)

Unnamed: 0,region,price,year,manufacturer,model,condition,cylinders,fuel,odometer,title_status,transmission,drive,size,type,county,state
243,long island,13695,2007,cadillac,escalade,,,gas,144492,clean,automatic,4wd,,SUV,,ny
10,lakeland,40990,2017,chevrolet,silverado 1500,excellent,8 cylinders,gas,47021,clean,automatic,4wd,full-size,pickup,,fl
88,tulsa,28999,2018,ford,f-150,like new,,gas,62161,clean,automatic,,,,,ok
279,baltimore,8345,2011,subaru,forester,,4 cylinders,gas,127848,clean,automatic,4wd,mid-size,wagon,,md
8,kalispell,26999,2012,chevrolet,tahoe 1500 ltz 4x4 gas,,,gas,109227,clean,automatic,4wd,,SUV,call 844-206-2069 and mention stock # 82777 ...,specializing in our signature lifted
223,south coast,14988,2017,ford,fusion,excellent,4 cylinders,gas,60759,clean,automatic,fwd,,sedan,,ma
252,santa barbara,5975,2009,chevrolet,traverse,excellent,6 cylinders,gas,197000,clean,automatic,fwd,mid-size,SUV,,ca
103,chicago,4990,2011,chevrolet,malibu ltz,good,4 cylinders,gas,171000,clean,automatic,fwd,mid-size,sedan,,il
91,tucson,7499,2010,mazda,mazda6,,,gas,149219,clean,automatic,,,sedan,,az
281,eastern panhandle,5990,2000,ford,f150,excellent,6 cylinders,gas,166508,clean,manual,4wd,full-size,truck,,wv


In [59]:
ex_df.dtypes

region          object
price           object
year            object
manufacturer    object
model           object
condition       object
cylinders       object
fuel            object
odometer        object
title_status    object
transmission    object
drive           object
size            object
type            object
county          object
state           object
dtype: object

22/04/05 22:14:09 WARN TransportChannelHandler: Exception in connection from /192.168.43.222:44227
java.io.IOException: Connection timed out
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysO