In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Fetched 252 kB in 5s (47

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-08-11 00:14:34--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-08-11 00:14:34 (11.4 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Final-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://cars-project.s3.amazonaws.com/data.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("data.csv"), sep=",", header=True, inferSchema=True)


In [5]:
# Show DataFrame
user_data_df.show()

+---+----------+----+-------+--------------------+--------+---------+------+------------+---+
|_c0|VEHICLE_ID|YEAR|   MAKE|               MODEL|HasMatch|DRIVER_ID|GENDER|CREDIT_SCORE|AGE|
+---+----------+----+-------+--------------------+--------+---------+------+------------+---+
|  0|   2005295|2012|PORSCHE|911 CARRERA CARRE...|    true|   731628|     1|         847| 55|
|  1|   2005295|2012|PORSCHE|911 CARRERA CARRE...|    true|   731634|     2|         847| 55|
|  2|   2005295|2012|PORSCHE|911 CARRERA CARRE...|    true|   731640|     2|         847| 21|
|  3|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|    true|    60076|     1|         866| 77|
|  4|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|    true|    60078|     2|         866| 76|
|  5|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|    true|   360037|     1|         768| 55|
|  6|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|    true|   360066|     2|         768| 52|
|  7|   4000092|2018| SUBARU|    IMPREZA 2.0I VDC|    true| 

In [6]:
# Drop NA values
dropna_df = user_data_df.dropna()

In [7]:
# Import library to access columns
from pyspark.sql.functions import col

# Drop columns that dont have car->driver matches
cleaned_df = dropna_df.filter(col("HasMatch") == True)
cleaned_df = cleaned_df.select(["VEHICLE_ID","YEAR","MAKE","MODEL","GENDER","CREDIT_SCORE","AGE"])
cleaned_df.drop_duplicates()
cleaned_df.show()

+----------+----+-------+--------------------+------+------------+---+
|VEHICLE_ID|YEAR|   MAKE|               MODEL|GENDER|CREDIT_SCORE|AGE|
+----------+----+-------+--------------------+------+------------+---+
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|     1|         847| 55|
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|     2|         847| 55|
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|     2|         847| 21|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|     1|         866| 77|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|     2|         866| 76|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|     1|         768| 55|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|     2|         768| 52|
|   4000092|2018| SUBARU|    IMPREZA 2.0I VDC|     2|         710| 35|
|   4000092|2018| SUBARU|    IMPREZA 2.0I VDC|     1|         710| 38|
|   4000094|2017| SUBARU|    IMPREZA 2.0I VDC|     1|         666| 29|
|   4000101|2017| SUBARU|IMPREZA W/ EYESIG...|     2|         824| 22|
|   40

In [8]:

# Create the cars DataFrame. 
# Used to populate js.dropdown selection lists
# ["VEHICLE_ID","YEAR","MAKE", "MODEL""HasMatch","DRIVER_ID","GENDER","CREDIT_SCORE","AGE"]
cars_df = cleaned_df.select(["VEHICLE_ID","YEAR","MAKE","MODEL"])
cars_df.show()

+----------+----+-------+--------------------+
|VEHICLE_ID|YEAR|   MAKE|               MODEL|
+----------+----+-------+--------------------+
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|
|   2005295|2012|PORSCHE|911 CARRERA CARRE...|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|
|   4000020|2011|  DODGE|DAKOTA CC BIGHORN...|
|   4000092|2018| SUBARU|    IMPREZA 2.0I VDC|
|   4000092|2018| SUBARU|    IMPREZA 2.0I VDC|
|   4000094|2017| SUBARU|    IMPREZA 2.0I VDC|
|   4000101|2017| SUBARU|IMPREZA W/ EYESIG...|
|   4000103|2017| SUBARU|IMPREZA 2.0I SPT VDC|
|   4000103|2017| SUBARU|IMPREZA 2.0I SPT VDC|
|   4000103|2017| SUBARU|IMPREZA 2.0I SPT VDC|
|   4000103|2017| SUBARU|IMPREZA 2.0I SPT VDC|
|   4000111|2017| SUBARU|IMPREZA W/ EYESIG...|
|   4000111|2017| SUBARU|IMPREZA W/ EYESIG...|
|   4000111|2017| SUBARU|IMPREZA W/ EYESIG...|
|   4000111|2

In [10]:
# Create the owner_demographic DataFrame. Used to 
# populate js.model_owner_demographic visual panel
# ["VEHICLE_ID","YEAR","MAKE", "MODEL""HasMatch","DRIVER_ID","GENDER","CREDIT_SCORE","AGE"]

owner_demographic = cleaned_df.groupby(["VEHICLE_ID"])

from functools import reduce
from pyspark.sql import DataFrame

MALE_df = cleaned_df.filter(col("GENDER") < 2)
MALE_df = MALE_df.groupby("VEHICLE_ID").agg({"GENDER":"count"}).withColumnRenamed("count(GENDER)", "MALE")


FEMALE_df = cleaned_df.filter(col("GENDER") > 1)
FEMALE_df = FEMALE_df.groupby("VEHICLE_ID").agg({"GENDER":"count"}).withColumnRenamed("count(GENDER)", "FEMALE")

age_df = cleaned_df.groupby("VEHICLE_ID").agg({"AGE":"avg"}).withColumnRenamed("avg(AGE)", "AGE")

credit_df = cleaned_df.groupby("VEHICLE_ID").agg({"CREDIT_SCORE":"avg"}).withColumnRenamed("avg(CREDIT_SCORE)", "CREDIT_SCORE")

MALE_df.drop_duplicates()
FEMALE_df.drop_duplicates()
age_df.drop_duplicates()
credit_df.drop_duplicates()

# Using Join expression create merged dataframe
owner_demographic_df = credit_df.join(MALE_df,["VEHICLE_ID"])\
                                .join(FEMALE_df,["VEHICLE_ID"])\
                                .join(age_df,["VEHICLE_ID"])

owner_demographic_df.drop_duplicates()
owner_demographic_df.show()

+----------+-----------------+----+------+------------------+
|VEHICLE_ID|     CREDIT_SCORE|MALE|FEMALE|               AGE|
+----------+-----------------+----+------+------------------+
|   4004650|793.6101694915254|  26|    33| 45.05084745762712|
|   4010313|            742.5|   2|     8|              47.7|
|   4031576| 778.952380952381|  21|    42| 53.38095238095238|
|   4049807|            750.0|   5|    10|              35.6|
|   6001065|785.2021857923497|  85|    98|35.759562841530055|
|   6012112|            694.0|   1|     2|48.333333333333336|
|   6062885|739.8181818181819|   8|     3|              33.0|
|   6065135|            692.4|   1|     4|              33.8|
|   6088419|            833.2|   4|     1|              32.4|
|   6090817|            666.0|   1|     1|              40.0|
|   6119363|            792.0|   5|     8| 48.61538461538461|
|   6142458|753.4516129032259|  17|    14|48.935483870967744|
|   6170003|789.2222222222222|   6|     3|36.888888888888886|
|   6212

In [11]:
# Create dataset for the bubble chart. Uses V_ID to associate 
# owners given from the user via dropdown selector to a model. 

# Create the cars DataFrame. ["VEHICLE_ID","YEAR","MAKE", "MODEL""HasMatch","DRIVER_ID","GENDER","CREDIT_SCORE","AGE"]
bubble_df = cleaned_df.select(["VEHICLE_ID","GENDER","CREDIT_SCORE","AGE"])
bubble_df.show()

+----------+------+------------+---+
|VEHICLE_ID|GENDER|CREDIT_SCORE|AGE|
+----------+------+------------+---+
|   2005295|     1|         847| 55|
|   2005295|     2|         847| 55|
|   2005295|     2|         847| 21|
|   4000020|     1|         866| 77|
|   4000020|     2|         866| 76|
|   4000020|     1|         768| 55|
|   4000020|     2|         768| 52|
|   4000092|     2|         710| 35|
|   4000092|     1|         710| 38|
|   4000094|     1|         666| 29|
|   4000101|     2|         824| 22|
|   4000103|     1|         710| 25|
|   4000103|     1|         847| 25|
|   4000103|     2|         847| 23|
|   4000103|     2|         666| 29|
|   4000111|     2|         710| 32|
|   4000111|     1|         710| 37|
|   4000111|     1|         824| 56|
|   4000111|     1|         824| 53|
|   4000131|     1|         824| 26|
+----------+------+------------+---+
only showing top 20 rows



In [12]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-1.ccwu5aoavni4.us-east-1.rds.amazonaws.com:5432/postgres"
config = {"user":"postgres", 
          "password": "Dcf3437848", 
          "driver":"org.postgresql.Driver"}

In [13]:
# Write review_id_df to table in RDS
cleaned_df.write.jdbc(url=jdbc_url, table='clean_data', mode=mode, properties=config)

In [14]:
# Write review_id_df to table in RDS
cars_df.write.jdbc(url=jdbc_url, table='vehicle_data', mode=mode, properties=config)

In [15]:
# Write review_id_df to table in RDS
owner_demographic_df.write.jdbc(url=jdbc_url, table='owner_demographic', mode=mode, properties=config)

In [16]:
# Write review_id_df to table in RDS
bubble_df.write.jdbc(url=jdbc_url, table='bubble_data', mode=mode, properties=config)

In [None]:
# cleaned_df.toPandas().to_csv('clean_data.csv')
# files.download('clean_data.csv')

In [36]:
from google.colab import files

import pandas 
#cleaned_df.toPandas().to_csv('clean_data.csv') 
#cars_df.toPandas().to_csv('vehicle_data.csv')
owner_demographic_df.toPandas().to_csv('owner_demographic_data.csv')
bubble_df.toPandas().to_csv('bubble_data.csv')
files.download('vehicle_data.csv')
files.download('owner_demographic_data.csv')
files.download('bubble_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>