# Problem Definition
#### How to determine the *price* of a used car?

## Contents

[Installation Setup](#Installation-Setup) <br>
+   [Environment Config](#Environment-Configuration) <br>
+   [Python Packages](#Loading-Packages) <br>
+   [Apache Spark](#Creating-SparkSession) <br>

[Extract, Transform, Load](#Extract-Stage) <br>
This includes the various stages of the ETL Pipeline <br>
+   [Extract](#Extract-Stage) <br>
    +   [Kaggle Dataset](#Kaggle-Dataset) <br>
    +   [Validating Data](#Validating-Data) <br>
    +   [Cleaning Data (Basic)](#Cleaning-Data-(Basic)) <br>
    +   [Caching Data on S3](#Caching-Extract-Data-on-S3) <br>
+   [Transform](#Transform-Stage) <br>
    +   [Cleaning Data](#Cleaning-Data) <br>
    +   [Feature Engineering](#Feature-Engineering) <br>
    +   [Sampling Data](#Sampling-Data) <br>
    +   [Exploratory Data Analysis using Pandas, Matplotlib and Seaborn](#Exploratory-Data-Analysis) <br>
    +   [Caching Data on S3](#Caching-Transform-Data-on-S3) <br>
+   [Load](#Load-Stage) <br>
    +   [Preprocessing Data for Learning Model](#Load-Data)
    +   [Migrate Data to Database](#Load-Data)
    
[Predicting Used Car Price](#Machine-Learning)
+   Implementing Linear Regression

# Installation Setup

## Tool Versions

```
Apache Spark - 2.4.3
Jupyter Notebook - 4.4.0
```
    
## Environment Configuration

#### Configuring ~/.bash_profile

```
export PATH="/usr/local/bin:$PATH"
PATH="/Library/Frameworks/Python.framework/Versions/3.7/bin:${PATH}"
export PATH=/usr/local/scala/bin:$PATH
export PATH=/usr/local/spark/bin:$PATH
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
export PYSPARK_PYTHON=python3.7
```

#### Configuring ~/.bashrc

```
export PYSPARK_PYTHON=/usr/local/bin/python3.7
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3.7
```



### Findspark

Use `findspark` to be able to find and import **Pyspark** module, while correctly setting environmental variables and dependencies.

In [2]:
import traceback
import findspark
try:
    findspark.init('C:\Spark\spark-3.2.1-bin-hadoop2.7\spark-3.2.1-bin-hadoop2.7')
except:
    print ("Error:", ''.join(traceback.format_stack()))

Check paths before Executing PySpark Session:

In [3]:
import os
import sys
print("PATH: %s\n" % os.environ['PATH'])
print("SPARK_HOME: %s" % os.environ['SPARK_HOME'])
print("PYSPARK_PYTHON: %s" % os.environ['PYSPARK_PYTHON'])
#print("PYSPARK_DRIVER_PYTHON: %s" % os.environ['PYSPARK_DRIVER_PYTHON'])

PATH: C:\Users\Teja Tetali\Anaconda;C:\Users\Teja Tetali\Anaconda\Library\mingw-w64\bin;C:\Users\Teja Tetali\Anaconda\Library\usr\bin;C:\Users\Teja Tetali\Anaconda\Library\bin;C:\Users\Teja Tetali\Anaconda\Scripts;C:\Users\Teja Tetali\Anaconda\bin;C:\Users\Teja Tetali\Anaconda\condabin;C:\Users\Teja Tetali\Anaconda;C:\Users\Teja Tetali\Anaconda\Library\mingw-w64\bin;C:\Users\Teja Tetali\Anaconda\Library\usr\bin;C:\Users\Teja Tetali\Anaconda\Library\bin;C:\Users\Teja Tetali\Anaconda\Scripts;C:\Program Files (x86)\Common Files\Oracle\Java\javapath;C:\Users\Teja Tetali\AppData\Local\Programs\Python\Python310\Scripts;C:\Users\Teja Tetali\AppData\Local\Programs\Python\Python310;C:\Program Files\Python310\Scripts;C:\Program Files\Python310;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0;C:\Windows\System32\OpenSSH;C:\Program Files (x86)\NVIDIA Corporation\PhysX\Common;C:\Program Files\NVIDIA Corporation\NVIDIA NvDLISR;C:\Program Files\Cloudf

## Loading Packages 

In [4]:
#import libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import seaborn as sns
import subprocess
from pyspark.sql.functions import *
from functools import reduce
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline

### Package Versions

In [5]:
print('pandas: {}'.format(pd.__version__))
print('numpy: {}'.format(np.__version__))
print('matplotlib: {}'.format(matplotlib.__version__))
print('seaborn: {}'.format(sns.__version__))
print('Python: {}'.format(sys.version))

pandas: 1.3.4
numpy: 1.20.3
matplotlib: 3.4.3
seaborn: 0.11.2
Python: 3.9.7 (default, Sep 16 2021, 16:59:28) [MSC v.1916 64 bit (AMD64)]


## Creating SparkSession
Get package to handle AWS to access S3:

In [6]:
%set_env PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


env: PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


Creating Spark Session, hosted across all local nodes on a **Standalone Cluster**:

In [7]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark Craigslist") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

Configure Hadoop connection for S3:

In [8]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", "AKIAQQADETG4J56SFQIV")
hadoopConf.set("fs.s3a.secret.key", "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")

Monitoring Spark instrumentation through the WebUI available through `localhost:4040/`

# Extract Stage

## Project Dataset

Available [https://www.truecar.com/used-cars-for-sale/listings/](https://www.truecar.com/used-cars-for-sale/listings/)

In [9]:
vehicle_listings = spark.read.format("csv").option("header", "true").load("true_car_listings.csv")
type(vehicle_listings)

pyspark.sql.dataframe.DataFrame

## Validating Data

Now that the data is available as a local *dataframe* on the Spark cluster, let's validate the dataframe by look at the schema, size, samples and statistics of our working data - 

In [10]:
vehicle_listings.printSchema()

root
 |-- Price: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Vin: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)



Dimensions of Raw Dataset:

In [11]:
print(vehicle_listings.count(),len(vehicle_listings.columns))

852122 8


Collecting random sample to see what kind of data populates each column:

In [12]:
vehicle_listings.sample(False,0.0001,10).toPandas().sample(10)

Unnamed: 0,Price,Year,Mileage,City,State,Vin,Make,Model
90,10495,2012,95187,Franklin,WI,1VWCH7A31CC061218,Volkswagen,Passat4dr
0,6900,2005,82100,OSSEO,MN,JH4DC54845S011549,Acura,RSXAutomatic
71,16678,2015,26578,Kendall,FL,KNMAT2MT9FP589013,Nissan,RogueS
8,38899,2017,9985,Beavercreek,OH,5GAKVCKD1HJ159449,Buick,EnclavePremium
30,10950,2012,77611,Houston,TX,1FAHP3N22CL464474,Ford,Focus5dr
68,12668,2014,80216,Newburgh,NY,JN8AS5MV6EW700390,Nissan,Rogue
33,11565,2006,148144,Brownsville,TX,1FTPW14V66KC91335,Ford,F-1504WD
4,46900,2017,4699,Concord,CA,5UXWX9C35H0W68516,BMW,X3xDrive28i
88,40990,2016,9713,Katy,TX,5TFDW5F15GX556304,Toyota,Tundra
35,46775,2015,24895,Puyallup,WA,1FTEW1EG6FFB70501,Ford,F-1504WD


Basic statistics on raw dataset columns:

In [13]:
#Example:
vehicle_listings.select("Price","Year","City","State","Vin").describe().show()

+-------+------------------+------------------+-----------+------+-----------------+
|summary|             Price|              Year|       City| State|              Vin|
+-------+------------------+------------------+-----------+------+-----------------+
|  count|            852122|            852122|     852122|852122|           852122|
|   mean| 21464.10020982911|2013.2891452162953|       null|  null|             69.0|
| stddev|13596.202240557528| 3.414987035747542|       null|  null|             null|
|    min|             10000|              1997|      AKRON|    AK|04WT3N56GG0646582|
|    max|             99999|              2018|victorville|    ga|ZN661YUS9HX226976|
+-------+------------------+------------------+-----------+------+-----------------+



Generally, the `.describe()` or `.explain()` method is a good way to start exploring a dataset. For this dataset, there are too many columns, some very unclean, and it is hard to decipher much from the above results. <br>

For this project, we will perform cleaning twice. The Extract stage has a basic cleaning section to remove duplicates and to obtain numerical features (notice in the schema above, all columns are *String* by default). We can cache the Extracted Data in a compressed format such as parquet, so it can be used for different pipelines. <br>

The main cleaning section is performed during the [Transform Stage](#Transform-Data).<br>



## Cleaning Data (Basic)

The goal of **Extract Stage** cleaning is to find a balance between cleaning the data and maintaining the maximum amount of raw data. This optimizes usability for other pipelines. <br>

### Removing Duplicates

The raw dataset was obtained using a scraper from truecar. This means that each post will have a unique url. Print the URL count and duplicate url.

The scraper can't find duplicate recors because of various reasons. Each duplicate is more likely the result of people posting multiple times or other errors. Therefore no record from the raw dataset can be found to be a complete duplicate.

However, duplicates can be removed by excluding `url` when calling .drop_duplicates() on **`vehicle_listings`**.

In [14]:
print(vehicle_listings.columns)

['Price', 'Year', 'Mileage', 'City', 'State', 'Vin', 'Make', 'Model']


In [15]:
#Drop Duplicates here using the above drop_duplicated() Function
vehicle_listings.drop_duplicates(['Price', 'Year', 'Mileage', 'City', 'State', 'Vin', 'Make', 'Model'])

DataFrame[Price: string, Year: string, Mileage: string, City: string, State: string, Vin: string, Make: string, Model: string]

### Obtain Numerical Columns
Convert *numerical feature types* from string to float to obtain **Numerical** data columns.

In [16]:
#Write Your Code here!
cols=["Price","Year","Mileage"]

vehicle_listings_clean = (reduce(
            lambda memo_df, col_name: memo_df.withColumn(col_name, vehicle_listings[col_name].cast("float")),
            cols,
            vehicle_listings))

vehicle_listings_clean.printSchema()

root
 |-- Price: float (nullable = true)
 |-- Year: float (nullable = true)
 |-- Mileage: float (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Vin: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)



## Caching Extract Data on S3

After cleaning, the final task in the **Extract** stage is to cache the data. I will store the cleaned dataset as a parquet file on Amazon S3. We can return to this file without having to run the Extract pipeline again, and attach other pipelines from this point as well. <br>

<div class="alert alert-warning">
<b>NOTE</b> This implementation uses .coalesce(4) before .write() to optimize read/write performance on S3.
</div>

In [19]:
#vehicle_listings_clean.write.parquet("s3a://yasin/usedcars/extract",mode="overwrite")
vehicle_listings_clean.coalesce(4).write.parquet("vehicle_listings_extract.parquet",mode='overwrite')





Unable to connect to S3 directly using SparkSession due to Spark generating its credentials, possibly due to versioning... <br>
Instead, we can connect to S3 using a bash script.<br>
The parquet compression brings the file size down to *<30MB*, so this doesn't cause performance issues here.<br>

In [None]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','write_extract'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.


This completes the **Extract Stage**. <br>

We clean, sample and explore the data in the **Transform Stage**. The data available on S3 is clean but as close to the raw data as possible. <br>

# Transform Stage

*First we obtain the working dataset from the cached source on S3.* <br>

In [None]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','read_extract'])
print ("Object Read from S3 to ../data.")

Connecting to AWS S3...
Object Read from S3 to ../data.


In [None]:
#vehicles_listings_read=spark.read.parquet("s3a://s3://yasin/usedcars/extract")
#vehicle_listings_read=spark.read.parquet("../data/vehicle_listings_read.parquet")

vehicle_listings_clean = spark.read.format("parquet").option("inferschema","true").load("vehicle_listings_read_extract.parquet")
type(vehicle_listings_clean)


pyspark.sql.dataframe.DataFrame

In [17]:
print(vehicle_listings_clean.count(),len(vehicle_listings_clean.columns))

852122 8


There are over 1.5 million records, and 26 columns. The dataset is too large to perform EDA (Exploratory Data Analysis) comfortably. <br>
The final data after *Cleaning* should be as clean as possible without redundant data.<br>

## Cleaning Data

Cleaning is essential during the **Transform Stage** of the ETL pipeline. The main goal here is to optimize the dataset for the endgoal of the project - Creating a machine learning pipeline to predict the target varibale **Price**. <br>

We can also cache the cleaned data set before sampling and exploration for other machine learning project using the same dataset.<br>

### Cleaning Checkpoints

+  General
    +  Drop Columns
    +  Drop Rows (Null Price)
+  Numerical 
    +  Apply Reasonable Ranges
        +  Kept Odometer Null Values
+  Ordinal
    +  Narrow down relevant options
    +  Remove errors in columns
    +  Keep countable ordinal values
+  Categorical
    +  State_code + State_name (Extract Stage)
        +  Removed State_name

#### Drop Columns

Some Columns are redundant, we can remove them before moving forward.<br>

In [20]:
#write your code here!
vehicle_listings.select("Price","Year","City","State","Vin","Make","Model").describe().show()

+-------+------------------+------------------+-----------+------+-----------------+------+-----------------+
|summary|             Price|              Year|       City| State|              Vin|  Make|            Model|
+-------+------------------+------------------+-----------+------+-----------------+------+-----------------+
|  count|            852122|            852122|     852122|852122|           852122|852122|           852122|
|   mean| 21464.10020982911|2013.2891452162953|       null|  null|             69.0|  null|6829.188492490424|
| stddev|13596.202240557528| 3.414987035747542|       null|  null|             null|  null|95299.16891308237|
|    min|             10000|              1997|      AKRON|    AK|04WT3N56GG0646582|    AM|                1|
|    max|             99999|              2018|victorville|    ga|ZN661YUS9HX226976| smart|         xDManual|
+-------+------------------+------------------+-----------+------+-----------------+------+-----------------+



'lat' and 'long' provide no more useful information than ('city','county_name', 'state_code', 'state_name'), and are therefore not needed.<br>
The other dropped columns are not helpful to the analysis.

The remaining attribute characteristics are varied, we have three types of features available in our data -
**Numerical**, **Ordinal**, **Categorical**<br>

**Potential Column Types -** <br>
Numerical - price, year, odometer <br>
Ordinal - condition, cylinders, fuel, title_status, transmission, drive, size, type <br>
Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

The target variable of this project is **Price**. Therefore, any records where price is null would be irrelevant. Check to see if the dataset contains null prices.

In [22]:
#write your code here!
vehicle_listings_clean.filter(vehicle_listings_clean.Price.isNull()).count()

0

### Numerical Data Columns

In [23]:
# Make Smaller subset to take quick look at records
Num=["Price","Year","Mileage"]

#### Applying a Reasonable Range on Continuous Types

Website postings that advertise items for sale often manipulate search algorithms to get more hits. For example, thousands of posts are spammed with a price of $1 (minimum allowed), to get more visibility through the 'search by price' filter. To ensure that our working dataset is within reasonable boundaries, we apply restrictions on the *Continuous* data columns.

In [25]:
#write your code here!
print("New data: %d" %vehicle_listings_clean.where("Year >= 2019").count())
print("Old data: %d" %vehicle_listings_clean.where("Year < 1930").count())
print("Expensive cars: %d" %vehicle_listings_clean.where("Price > 300000").count())
print("Cheap cars: %d" %vehicle_listings_clean.where("Price < 50").count())
print("Many mileage: %d" %vehicle_listings_clean.where("Mileage > 25000").count())
print("few mileage: %d" %vehicle_listings_clean.where("Mileage < 50").count())

New data: 0
Old data: 0
Expensive cars: 91
Cheap cars: 0
Many mileage: 624342
few mileage: 3830


Before removing 20k+ records due to price, let's confirm if the spame hypothesis is true:

In [26]:
#write your code here!
vehicle_listings_clean.sample(False,0.001,100).where("Price<50").select(Num).show()

+-----+----+-------+
|Price|Year|Mileage|
+-----+----+-------+
+-----+----+-------+



As expected, these randomly obtained results show us that "price<50" is not very useful to the study. <br>


In [27]:
#Write your code here!
print("Mising values of Mileage:",vehicle_listings_clean.filter(vehicle_listings_clean.Mileage.isNull()).count())
print("Missing values Price :",vehicle_listings_clean.filter(vehicle_listings_clean.Price.isNull()).count())
print("Missing values of  Year :",vehicle_listings_clean.filter(vehicle_listings_clean.Year.isNull()).count())

Mising values of Mileage: 0
Missing values Price : 0
Missing values of  Year : 0


The Null counts shows us that a lot of listings have not reported Odometer values, perhaps to increase chances of selling a car or maybe due to lack of info. The question is, do we remove all these records altogether?<br>
I would rather keep these records because the 50k+ rows will still provide useful information about other attributes. Removing these values would reduce the dataset by 35%, which is too high.<br>
<br>Apply the restrictions to vehicle_listings_clean.<br>
Note- The Spark environment allows me to use a direct SQL query to restrict odometer values. This is better suited because the **`pyspark.sql.dataframe`** library treats "null" values as false when evaluating the `.where()` function.

In [28]:
#write your code here!
vehicle_listings_clean = vehicle_listings_clean.where("Price>=50 and Price<=300000").where("Year>=1930 and Year<2019") 

vehicle_listings_clean.registerTempTable("df_temp")

vehicle_listings_clean=spark.sql("SELECT * FROM df_temp WHERE (Mileage>=50 AND Mileage<=250000) OR (Mileage IS NULL)")

main=vehicle_listings.count()
recent=vehicle_listings_clean.count()

print("Number of original dataset ",main)
print("range ",recent)
print(" %d percent number of data stored" % (100 * recent / main))

vehicle_listings_clean.select("Price","Year","Mileage").describe().show()



Number of original dataset  852122
range  846912
 99 percent number of data stored
+-------+------------------+------------------+-----------------+
|summary|             Price|              Year|          Mileage|
+-------+------------------+------------------+-----------------+
|  count|            846912|            846912|           846912|
|   mean|  21396.6219040467|2013.2876473588756|52345.84431676491|
| stddev|13048.694725871834|3.3955512601171827|40034.19026960797|
|    min|            1500.0|            1997.0|             50.0|
|    max|          299999.0|            2018.0|         250000.0|
+-------+------------------+------------------+-----------------+



Although only 3 attributes, **price, year, odometer**, can be directly converted to numerical value, other attributes can be be transformed to fit the Categorical and Ordinal labels. The difference between categorical and ordinal values is that ordinal values have a clear and restricted ordering of types. For example, *'condition'* would be ordinal, ranging from excellent to poor.

### Ordinal Data Columns

Ordinal data will play an important role in learning about price. An ordinal data type is a variable that has a limited number of options, which can be ordered. For example, `"condition"` should range from excellent to bad (or in this case, "salvage".

The following columns are of the **Ordinal** types -  <br>
(condition, cylinders, fuel, title_status, transmission, drive, size, type) <br>

**`ordinals_options_all`** is a map linking each ordinal to each distinct value provided for it. This will help cleaning the dataset faster.

In [29]:
#writw your code here
ordcolumns=vehicle_listings_clean['Price', 'Year', 'Mileage', 'City', 'State', 'Vin', 'Make', 'Model']
print(ordcolumns)

DataFrame[Price: float, Year: float, Mileage: float, City: string, State: string, Vin: string, Make: string, Model: string]


As expected, all 8 Ordinal columns have values that seem to be errors, or reported incorrectly. For example, let's look into the "Condition" reported as 'audio controls"'. 

In [31]:
#write your code here!
vehicle_listings_clean.filter("Model = 'Tundra\"'" ).select(Num+["Model"]).show()

+-----+----+-------+-----+
|Price|Year|Mileage|Model|
+-----+----+-------+-----+
+-----+----+-------+-----+



Only 1 record fetched. 

The next task is to clean these misleading values for each of the ordinal types. Automating this process, at the very least requires two `pyspark.sql.dataframe` library calls for each distinct ordinal option - `count()` and `replace()`. We can count the number of occurences of a value in an ordinal column, if it's less than 10 then it is  irrelevant and will be grouped under "other".

**`remove_fakes()`** is a function that takes in a column and all distinct values within the column, and evaluates which options are useful. It bunches up other options under "Other".

Time Cost - Each call to **`remove_fakes()`** will take **O(n)** *(O(2n+1))* where *n* is the length of the column. Because the algorithm needs to call both `count()` and `replace()` for any ordinal/value combination, this is the fastest way to clean the ordinal columns.

In [32]:
#write your code here
print(vehicle_listings_clean.select("Model").distinct().count())

2717


The above function does not count *null* values. The difference between values printed as a list after the clean versus the final `count()` value are *null* and, in some cases, *other*.

#### Cleaning column = 'condition'

In [33]:
#write your code here for Price
print("Number for price",vehicle_listings_clean.select("Price").distinct().count())

Number for price 46715


#### Cleaning column = 'cylinders'

In [34]:
#write your code here for Year
print("Number for year",vehicle_listings_clean.select("Year").distinct().count())

Number for year 22


#### Cleaning column = 'fuel'

In [35]:
#write your code here for Mileage
print("Number for Mileage",vehicle_listings_clean.select("Mileage").distinct().count())

Number for Mileage 157547


#### Cleaning column = 'title_status'

In [36]:
#write your code here for City
print("Number for City",vehicle_listings_clean.select("City").distinct().count())

Number for City 2553


#### Cleaning column = 'transmission'

In [37]:
#write your code here State
print("Number for State",vehicle_listings_clean.select("State").distinct().count())

Number for State 59


#### Cleaning column = 'drive'

In [38]:
#write your code here Vin
print("Number for Vin",vehicle_listings_clean.select("Vin").distinct().count())

Number for Vin 846865


#### Cleaning column = 'size'

In [39]:
#write your code here Make
print("Number for Make",vehicle_listings_clean.select("Make").distinct().count())

Number for Make 58


#### Cleaning column = 'type'

In [40]:
#write your code here Model
print("Number for Model",vehicle_listings_clean.select("Model").distinct().count())

Number for Model 2717


### Categorical Data Columns

Categorical - city, manufacturer, make, county_name, state_code, state_name <br>

During the **Transform** stage, categorical columns will help with *Stratified Sampling* before beginning with Exploratory Data Analysis... 

There isn't much cleaning to be done with categorical data. An easy observation is that this set of columns give us insight about location unlike other column types. There might be a way to combine state_code and state_name.

In [41]:
#write your code here
vehicle_listings_clean.select("City","State").sample(False,0.0001,100).show()

+---------------+-----+
|           City|State|
+---------------+-----+
|       Kirkland|   WA|
|   Morton Grove|   IL|
|         Tacoma|   WA|
|     Carrollton|   TX|
|     Louisville|   KY|
|      Westfield|   IN|
|       Columbia|   MO|
|Fort Lauderdale|   FL|
|        Concord|   NC|
|          Havre|   MT|
|        Fairfax|   VA|
|          Davie|   FL|
|        Houston|   TX|
|      Bethlehem|   PA|
| East Greenwich|   RI|
|       Wartburg|   TN|
|      Las Vegas|   NV|
|         Athens|   OH|
|        Houston|   TX|
|     Hackensack|   NJ|
+---------------+-----+
only showing top 20 rows



We can see that when state_code is *null*, the matching state_name is *Failed*. Since the state_name appears to be generated by state_code, there isn't any point in keeping both. Let's remove state_name. Our total columns are now 16.

In [42]:
print("Original = ",vehicle_listings.count(),"*",len(vehicle_listings.columns))
print("New = ",vehicle_listings_clean.count(),"*",len(vehicle_listings_clean.columns))

Original =  852122 * 8
New =  846912 * 8


#### Final dataset size:

In [43]:
#write your code here
old_col=vehicle_listings.count()
old_row=len(vehicle_listings.columns)
new_col=vehicle_listings_clean.count()
new_row=len(vehicle_listings_clean.columns)

print("Original Dimensions = ",old_col,"*",old_row)
print("New Dimensions = ",new_col,"*",new_row)


print("Data kept for analysis: %d percent of records" % (100 * new_col / old_col))

Original Dimensions =  852122 * 8
New Dimensions =  846912 * 8
Data kept for analysis: 99 percent of records


The used cars dataset is now cleaned and ready for analysis. We can cache this version on S3.

In [None]:
#vehicle_listings_clean.write.parquet("s3a://deveshetl/usedcars/transform",mode="overwrite")
vehicle_listings_clean.coalesce(4).write.parquet("vehicle_listings_transform.parquet",mode='overwrite')

In [None]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','write_transform'])
print ("Object Written to S3.")

Connecting to AWS S3...
Object Written to S3.


## Feature Engineering

*First we obtain the working dataset from the cached source on S3.* <br>

In [None]:
print ("Connecting to AWS S3...")
subprocess.Popen(['./S3_connect.command','read_transform'])
print ("Object Read from S3 to ../data.")

Connecting to AWS S3...
Object Read from S3 to ../data.


In [None]:
#vehicles_listings_read=spark.read.parquet("s3a://s3://yasin/usedcars/extract")
#vehicle_listings_read=spark.read.parquet("../data/vehicle_listings_read.parquet")

vehicle_listings_clean = spark.read.format("parquet").option("inferschema","true").load("vehicle_listings_read_transform.parquet")
type(vehicle_listings_clean)


pyspark.sql.dataframe.DataFrame

In [44]:
print(vehicle_listings_clean.count(),len(vehicle_listings_clean.columns))

846912 8


In the section, we will obtain features from the ordinal columns, in order to use the information for machine learning.

In [46]:
print(ordcolumns)

DataFrame[Price: float, Year: float, Mileage: float, City: string, State: string, Vin: string, Make: string, Model: string]


In [47]:
vehicle_listings_clean = vehicle_listings_clean.fillna({'Price':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'Year':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'Mileage':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'City':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'State':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'Vin':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'Make':'other'})
vehicle_listings_clean = vehicle_listings_clean.fillna({'Model':'other'})

### String Indexer

Using `StringIndexer`, convert ordinal column names to numerical values. It is import to retain the original columns track which index is assigned to which unique column option.

In [51]:
"""
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index")\
            .setHandleInvalid("skip")\
            .fit(vehicle_listings_clean) for column in ordcolumns]

pipeline = Pipeline(stages=indexers)
vehicle_listings_features = pipeline.fit(vehicle_listings_clean).transform(vehicle_listings_clean)
"""

pipeline = Pipeline(stages=[
    StringIndexer(inputCol=c, outputCol='{}_index'.format(c))
    for c in vehicle_listings_clean.columns
])

vehicle_listings_features = pipeline.fit(vehicle_listings_clean)



In [54]:
print(vehicle_listings.printSchema())
vehicle_listings.head(3)

root
 |-- Price: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Vin: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)

None


[Row(Price='8995', Year='2014', Mileage='35725', City='El Paso', State=' TX', Vin='19VDE2E53EE000083', Make='Acura', Model='ILX6-Speed'),
 Row(Price='10888', Year='2013', Mileage='19606', City='Long Island City', State=' NY', Vin='19VDE1F52DE012636', Make='Acura', Model='ILX5-Speed'),
 Row(Price='8995', Year='2013', Mileage='48851', City='El Paso', State=' TX', Vin='19VDE2E52DE000025', Make='Acura', Model='ILX6-Speed')]

In [57]:
print(vehicle_listings.count(),len(vehicle_listings.columns))

852122 8


### One Hot Encoding

Using `OneHotEncoderEstimator`, we can encode the ordinal column indexes.

One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.

In [58]:
#write your code here
vehicle_listings_encoded = Pipeline(stages=[
   OneHotEncoder(inputCol=c, outputCol='{}_index'.format(c))
    for c in vehicle_listings_clean.columns
])

In [60]:
vehicle_listings.printSchema()

root
 |-- Price: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Mileage: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Vin: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)



The new 8 columms of `vehicle_listings_encoded` are of type `vector`. This type can be recognized by Machine Learning models, so there is not need to extract each vector into separate columns.

In [None]:
print(vehicle_listings_encoded.count(),len(vehicle_listings_encoded.columns))

1470500 32


## Sampling Data

<br>To explore the dataset for analysis, create a **sample pandas dataframe** from the engineered data:

In [61]:
#write your code here
eda_df=vehicle_listings_clean.sample(False,0.001,63).toPandas()

With a small random sample of the Feature Engineered Data, we can take a look at what we're working with.

In [62]:
#write your code here
eda_df.sample(n=10,replace=False,random_state=63)

Unnamed: 0,Price,Year,Mileage,City,State,Vin,Make,Model
446,17799.0,2016.0,9895.0,Austin,TX,KMHTC6AE1GU273805,Hyundai,VelosterAutomatic
236,17320.0,2017.0,14910.0,Port Richey,FL,3FA6P0H78HR117544,Ford,FusionSE
536,33797.0,2015.0,27037.0,Mission Viejo,CA,2T2ZK1BA9FC167478,Lexus,RX
443,6995.0,2011.0,97038.0,Chantilly,VA,5NPEB4ACXBH143074,Hyundai,Sonata4dr
699,8000.0,2007.0,101866.0,Pasadena,CA,JF1GD61657G521361,Subaru,Impreza
589,17500.0,2015.0,6713.0,Louisville,KY,4A4AP4AW7FE041739,Mitsubishi,Outlander
332,77880.0,2017.0,15804.0,Austin,TX,1FT7W2BT7HEC04096,Ford,Super
321,18997.0,2012.0,69395.0,Ramsey,NJ,1FTNE2EW9CDB13412,Ford,Econoline
681,36995.0,2017.0,8875.0,Jacksonville,IL,1C6RR7TT2HS646384,Ram,1500SLT
322,27900.0,2014.0,64959.0,Pilot Point,TX,1FTFW1ET5EFB84597,Ford,F-1504WD


In [63]:
eda_df.dtypes

Price      float32
Year       float32
Mileage    float32
City        object
State       object
Vin         object
Make        object
Model       object
dtype: object