# Problem Definition
#### How to determine the *price* of a used car?
by Moses kiboma

## Contents

[Installation Setup](#Installation-Setup) <br>
+   [Environment Config](#Environment-Configuration) <br>
+   [Python Packages](#Loading-Packages) <br>
+   [Apache Spark](#Creating-SparkSession) <br>

[Extract, Transform, Load](#Extract-Stage) <br>
This includes the various stages of the ETL Pipeline <br>
+   [Extract](#Extract-Stage) <br>
    +   [TrueCar Used Car Listed Data](#Truecar.com) <br>
    +   [Validating Data](#Validating-Data) <br>
    +   [Cleaning Data (Basic)](#Cleaning-Data-(Basic)) <br>
    +   [Caching Data on S3](#Caching-Extract-Data-on-S3) <br>
+   [Transform](#Transform-Stage) <br>
    +   [Cleaning Data](#Cleaning-Data) <br>
    +   [Feature Engineering](#Feature-Engineering) <br>
    +   [Sampling Data](#Sampling-Data) <br>
    +   [Exploratory Data Analysis using Pandas, Matplotlib and Seaborn](#Exploratory-Data-Analysis) <br>
    +   [Caching Data on S3](#Caching-Transform-Data-on-S3) <br>
+   [Load](#Load-Stage) <br>
    +   [Preprocessing Data for Learning Model](#Load-Data)
    +   [Migrate Data to Database](#Load-Data)
    
[Predicting Used Car Price](#Machine-Learning)
+   Implementing Linear Regression

# Installation Setup

## Tool Versions

```
Apache Spark - 2.4.3
Jupyter Notebook - 4.4.0
```
    
## Environment Configuration

#### Configuring ~/.bash_profile

```
export PATH="/usr/local/bin:$PATH"
PATH="/Library/Frameworks/Python.framework/Versions/3.7/bin:${PATH}"
export PATH=/usr/local/scala/bin:$PATH
export PATH=/usr/local/spark/bin:$PATH
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
export PYSPARK_PYTHON=python3.7
```

#### Configuring ~/.bashrc

```
export PYSPARK_PYTHON=/usr/local/bin/python3.7
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3.7
```



In [1]:
#!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


### Findspark

Use `findspark` to be able to find and import **Pyspark** module, while correctly setting environmental variables and dependencies.

In [1]:
import traceback
import findspark
try:
    findspark.init('/opt/spark')
except:
    print ("Error:", ''.join(traceback.format_stack()))

Check paths before Executing PySpark Session:

In [2]:
import os
import sys
print("PATH: %s\n" % os.environ['PATH'])
print("SPARK_HOME: %s" % os.environ['SPARK_HOME'])
print("PYSPARK_PYTHON: %s" % os.environ['PYSPARK_PYTHON'])


PATH: /home/moses/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin

SPARK_HOME: /opt/spark
PYSPARK_PYTHON: /bin/python3


## Loading Packages 

In [3]:
#import libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.pylab as pylab
import seaborn as sns
import subprocess
from pyspark.sql.functions import *
from functools import reduce
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline

### Package Versions

In [4]:
print('pandas: {}'.format(pd.__version__))
print('numpy: {}'.format(np.__version__))
print('matplotlib: {}'.format(matplotlib.__version__))
print('seaborn: {}'.format(sns.__version__))
print('Python: {}'.format(sys.version))

pandas: 1.4.1
numpy: 1.22.2
matplotlib: 3.5.1
seaborn: 0.11.2
Python: 3.8.10 (default, Nov 26 2021, 20:14:08) 
[GCC 9.3.0]


## Creating SparkSession
Get package to handle AWS to access S3:

In [5]:
%set_env PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


env: PYSPARK_SUBMIT_ARGS=--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell


Creating Spark Session, hosted across all local nodes on a **Standalone Cluster**:

In [6]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("PySpark Craigslist") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = spark.sparkContext

22/03/23 17:58:26 WARN Utils: Your hostname, kiboma resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlo1)
22/03/23 17:58:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/moses/.ivy2/cache
The jars for the packages stored in: /home/moses/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ec1fa297-91fd-4097-946b-9ba461ceb0e2;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.3 in central
	found org.apache.hadoop#hadoop-common;2.7.3 in central
	found org.apache.hadoop#hadoop-annotations;2.7.3 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commons-collectio

22/03/23 17:58:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Configure Hadoop connection for S3:

In [7]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", "AKIAQQADETG4GH4VNP")
hadoopConf.set("fs.s3a.secret.key", "wXni64+7npCqxCDB14zMlyf4D6D2LR+B/M1pvz")

Monitoring Spark instrumentation through the WebUI available through `localhost:4040/`

# Extract Stage

## Project Dataset

Available [https://www.truecar.com/used-cars-for-sale/listings/](https://www.truecar.com/used-cars-for-sale/listings/)

In [15]:
vehicle_listings = spark.read.format("csv").option("header", "true").load("clean_car_dataset.csv")
type(vehicle_listings)

                                                                                

pyspark.sql.dataframe.DataFrame

## Validating Data

Now that the data is available as a local *dataframe* on the Spark cluster, let's validate the dataframe by look at the schema, size, samples and statistics of our working data - 

In [16]:
vehicle_listings.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- PRICE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- MILEAGE: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- MAKE: string (nullable = true)
 |-- MODEL: string (nullable = true)



Dimensions of Raw Dataset:

In [17]:
print(vehicle_listings.count(),len(vehicle_listings.columns))

[Stage 1:>                                                          (0 + 4) / 4]

539107 8


                                                                                

Collecting random sample to see what kind of data populates each column: