# CS651 Project - Load and down-sampling

Authors: 
- Jacky Chen (j57chen@uwaterloo.ca)
- Eric Wang (e246wang@uwaterloo.ca)

Description:
- In this notebook, we performed data transformation tasks and exported a subsample of the original 10GB data to perform data science task faster.
- Data transformation tasks include setting up correct data type, dropping unuseful columns, and data cleaning

Data Source:
- US Used cars dataset from Kaggle (https://www.kaggle.com/datasets/ananaymital/us-used-cars-dataset)
- US cities (https://simplemaps.com/data/us-cities)
- US states boundaries (https://github.com/sunny2309/datasets)



## Setup

Spark is not installed in Colab so we have to install it ourself. This will take a minute to finish.

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
# from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer

In [None]:
# sc = SparkContext(appName="651ProjCleanData", master="local[*]")
spark = SparkSession.builder.appName("cs651-preprocessing").getOrCreate()

Mount Google drive

In [None]:
import os
from google.colab import drive
drive.mount('/content/drive')

proj_root = 'drive/MyDrive/CS651Proj'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Load data from CSV

The source csv file uses single quote (") to escape a quote inside an already quoted value.

For example, there's a row like this:

JTHSZ5BC5J5008777, ..., "CERTIFIED ... ""Pay the Least in the East"" ... wipers", V6, ...

The value in description column show be "CERTIFIED ... ""Pay the Least in the East"" ... wipers"

In [None]:
!grep "JTHSZ5BC5J5008777" drive/MyDrive/CS651Proj/data/used_cars_data.csv  



Some column has new line characters that break a cell value into multiple lines.

The command below shows an example where the description column has multiple lines

In [None]:
!grep -m 1 "JM1NA3532V0720995" drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "No mod original " drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "The parking brake" drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "Small rust. And b" drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "No accident. VIN:" drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "Available for a t" drive/MyDrive/CS651Proj/data/used_cars_data.csv  
!grep -m 1 "1GCRYDED1LZ200145" drive/MyDrive/CS651Proj/data/used_cars_data.csv 
!grep -m 1 "5N1AT2MV8LC802999" drive/MyDrive/CS651Proj/data/used_cars_data.csv  

JM1NA3532V0720995,--,,,,Convertible,,West Henrietta,20.0,,9,14586,"Price HIGHLY negotiable.
No mod original MX5 Miata NA 1997 other than headlight bulbs.
The parking brake cable needs to be replaced.
Small rust. And bad paint on the tail bumper and trunk lid.
No accident. VIN:JM1NA3532V
Available for a test drive anytime!",I4,1800.0,I4,Blue,True,False,False,,42.7 in,12 gal,Gasoline,False,48.2 in,26.0,133.0,Black,False,,,False,,43.0464,155.4 in,2020-09-02,BLUE,280980001,-77.6893,https://static.cargurus.com/images/forsale/2020/09/01/18/12/1997_mazda_mx-5_miata-pic-2139338383069711728-152x114.jpeg,,Mazda,2 seats,126364.0,MX-5 Miata,4.0,"133 hp @ 6,500 RPM",3600.0,False,0,,,private seller,False,"114 lb-ft @ 5,500 RPM",M,5-Speed Manual,t24119,Base,,RWD,Rear-Wheel Drive,89.2 in,65.9 in,1997


In [None]:
df = spark\
  .read\
  .option("header", "true")\
  .option("escape", '"')\
  .option("multiline", "true")\
  .csv(f"{proj_root}/data/used_cars_data.csv")

In [None]:
# create surrogate key
df = df.withColumn("rowno", F.monotonically_increasing_id()+1)

## Explore data

In [None]:
import pandas as pd

In [None]:
# df.printSchema()

In [None]:
n_rows = df.count()
n_cols = len(df.columns)
print(f"dataset shape is {n_rows} by {n_cols}")

In [None]:
df.columns

In [None]:
df.show(5)

+-----------------+------------+----+----------+----------+---------------+-----+--------+-----------------+--------------------+------------+----------+--------------------+----------------+-------------------+-----------+--------------+-----+-------------+----------------+--------------+-------------+----------------+---------+-------------+-------+--------------------+----------+--------------------+-----+------------+------+------+---------+--------+--------+-----------+-------------+----------+---------+--------------------+--------------------+----------+---------------+-------+---------------+-----------+------------------+-------+-------+--------------+-------------+------+-------------------+-----------+--------------------+------------+--------------------+------+------------+-----------------------+------------+--------------------+---------+-------+----+-----+
|              vin|back_legroom| bed|bed_height|bed_length|      body_type|cabin|    city|city_fuel_economy|combine

In [None]:
null_cnt_df = df.select([F.count(F.when(F.isnull(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [None]:
null_cnt_df = null_cnt_df.T

In [None]:
null_cnt_df.index.name = 'col_name'
null_cnt_df.columns = ['count']
null_cnt_df['ratio'] = null_cnt_df['count'] / n_rows

In [None]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None): 
  print(null_cnt_df.sort_values(by=['ratio'], ascending=False))

                           count   percent
col_name                                  
vehicle_damage_category  3000040  1.000000
combine_fuel_economy     3000040  1.000000
is_certified             3000040  1.000000
bed                      2980472  0.993477
cabin                    2936507  0.978823
is_oemcpo                2864678  0.954880
is_cpo                   2817142  0.939035
bed_height               2570942  0.856969
bed_length               2570942  0.856969
owner_count              1517013  0.505664
fleet                    1426595  0.475525
theft_title              1426595  0.475525
isCab                    1426595  0.475525
has_accidents            1426595  0.475525
frame_damaged            1426595  0.475525
salvage                  1426595  0.475525
franchise_make            572635  0.190876
torque                    517793  0.172595
highway_fuel_economy      491285  0.163759
city_fuel_economy         491285  0.163759
power                     481426  0.160473
main_pictur

## Transform data

In [None]:
# Drop columns that have little data or useless to our analysis
df_dropped = df.drop("bed","bed_height","bed_length", "cabin", "combine_fuel_economy", \
                     "is_certified", "main_picture_url", "description", "us_city", \
                     "is_oemcpo", "is_cpo", "vehicle_damage_category", \
                     "vin", "listing_id", "sp_id", "trimId")

Ref:
- how to use when and regexp_replace: https://sparkbyexamples.com/pyspark/pyspark-replace-column-values/

In [None]:
# general transformation
def to_lower_str(df, col_name):
  return df.withColumn(col_name, F.lower(F.col(col_name)))
def cast_to_double(df, col_name):
  return df.withColumn(col_name, F.col(col_name).cast('double'))
def cast_to_int(df, col_name):
  return df.withColumn(col_name, F.col(col_name).cast('int'))
def cast_to_boolean(df, col_name):
  return df.withColumn(col_name, 
                       F.when(F.upper(F.col(col_name)) == 'TRUE', 1) \
                       .when(F.upper(F.col(col_name)) == 'FALSE', 0) \
                       .otherwise(None))
def cast_to_date(df, col_name):
  return df.withColumn(col_name, F.to_date(F.col(col_name), "yyyy-MM-dd"))


def parse_double_before_space(df, col_name):
  return df.withColumn(col_name, F.split(F.col(col_name), ' ').getItem(0).cast('double'))

# column specific transformation
def clean_engine_cylinders(df):
  return df.withColumn("engine_cylinders", F.regexp_replace(df.engine_cylinders, '[a-zA-Z]', '').cast('int'))
def clean_engine_type(df):
  return df.withColumn("engine_type", F.lower(F.regexp_replace(df.engine_type, '[0-9]', '')))
def clean_city(df, col_name):
  return df.withColumn(col_name, F.regexp_replace(F.regexp_replace(F.regexp_replace(F.lower(F.col(col_name)), '^fort ', 'ft '), '^saint ', 'st '), '[^a-zA-Z]', ''))
def clean_model_name(df):
  return df.withColumn("model_name", F.regexp_replace(F.lower(df.model_name), '[^a-z0-9]', ''))

In [None]:
df_dropped_clean = df_dropped \
.transform(parse_double_before_space, "back_legroom") \
.transform(to_lower_str, "body_type") \
.transform(clean_city, "city") \
.transform(cast_to_double, "city_fuel_economy") \
.transform(cast_to_int, "daysonmarket") \
.transform(cast_to_int, "dealer_zip") \
.transform(clean_engine_cylinders) \
.transform(cast_to_double, "engine_displacement") \
.transform(clean_engine_type) \
.transform(to_lower_str, "exterior_color") \
.transform(cast_to_boolean, "fleet") \
.transform(cast_to_boolean, "frame_damaged") \
.transform(cast_to_boolean, "franchise_dealer") \
.transform(to_lower_str, "franchise_make") \
.transform(parse_double_before_space, "front_legroom") \
.transform(parse_double_before_space, "fuel_tank_volume") \
.transform(to_lower_str, "fuel_type") \
.transform(cast_to_boolean, "has_accidents") \
.transform(parse_double_before_space, "height") \
.transform(cast_to_double, "highway_fuel_economy") \
.transform(cast_to_double, "horsepower") \
.transform(to_lower_str, "interior_color") \
.transform(cast_to_boolean, "isCab") \
.transform(cast_to_boolean, "is_new") \
.transform(cast_to_double, "latitude") \
.transform(parse_double_before_space, "length") \
.transform(cast_to_date, "listed_date") \
.transform(to_lower_str, "listing_color") \
.transform(cast_to_double, "longitude") \
.transform(to_lower_str, "major_options") \
.transform(to_lower_str, "make_name") \
.transform(parse_double_before_space, "maximum_seating") \
.transform(cast_to_double, "mileage") \
.transform(clean_model_name) \
.transform(cast_to_int, "owner_count") \
.transform(parse_double_before_space, "power") \
.transform(cast_to_double, "price") \
.transform(cast_to_boolean, "salvage") \
.transform(cast_to_double, "savings_amount") \
.transform(cast_to_double, "seller_rating") \
.transform(to_lower_str, "sp_name") \
.transform(cast_to_boolean, "theft_title") \
.transform(parse_double_before_space, "torque") \
.transform(to_lower_str, "transmission") \
.transform(to_lower_str, "transmission_display") \
.transform(to_lower_str, "trim_name") \
.transform(to_lower_str, "wheel_system") \
.transform(to_lower_str, "wheel_system_display") \
.transform(parse_double_before_space, "wheelbase") \
.transform(parse_double_before_space, "width") \
.transform(cast_to_int, "year")

In [None]:
df_dropped_clean.show()

+------------+---------------+--------+-----------------+------------+----------+----------------+-------------------+-----------+--------------------+-----+-------------+----------------+--------------+-------------+----------------+---------+-------------+------+--------------------+----------+--------------------+-----+------+--------+------+-----------+-------------+---------+--------------------+----------+---------------+-------+------------------+-----------+-----+-------+-------+--------------+-------------+-------------------+-----------+------+------------+--------------------+--------------------+------------+--------------------+---------+-----+----+-----+
|back_legroom|      body_type|    city|city_fuel_economy|daysonmarket|dealer_zip|engine_cylinders|engine_displacement|engine_type|      exterior_color|fleet|frame_damaged|franchise_dealer|franchise_make|front_legroom|fuel_tank_volume|fuel_type|has_accidents|height|highway_fuel_economy|horsepower|      interior_color|isCab

In [None]:
df_dropped_clean.printSchema()

root
 |-- back_legroom: double (nullable = true)
 |-- body_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_fuel_economy: double (nullable = true)
 |-- daysonmarket: integer (nullable = true)
 |-- dealer_zip: integer (nullable = true)
 |-- engine_cylinders: integer (nullable = true)
 |-- engine_displacement: double (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- fleet: integer (nullable = true)
 |-- frame_damaged: integer (nullable = true)
 |-- franchise_dealer: integer (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: double (nullable = true)
 |-- fuel_tank_volume: double (nullable = true)
 |-- fuel_type: string (nullable = true)
 |-- has_accidents: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- highway_fuel_economy: double (nullable = true)
 |-- horsepower: double (nullable = true)
 |-- interior_color: string (nullable = true)
 |-- isCab:

In [None]:
df_dropped_clean.sort(F.col("price").desc()).show()

+------------+---------------+------------+-----------------+------------+----------+----------------+-------------------+-------------------+--------------------+-----+-------------+----------------+--------------+-------------+----------------+-----------------+-------------+------+--------------------+----------+--------------------+-----+------+--------+------+-----------+-------------+---------+--------------------+-------------+---------------+--------+------------+-----------+-----+---------+-------+--------------+------------------+--------------------+-----------+------+------------+--------------------+--------------------+------------+--------------------+---------+-----+----+-------+
|back_legroom|      body_type|        city|city_fuel_economy|daysonmarket|dealer_zip|engine_cylinders|engine_displacement|        engine_type|      exterior_color|fleet|frame_damaged|franchise_dealer|franchise_make|front_legroom|fuel_tank_volume|        fuel_type|has_accidents|height|highway_fu

## Join with US cities dataset to find state

Cleaning city names massively improved join rates

Is city names are not cleaned, 166713 rows are not matched

After transforming city column and adding missing US cities that have a high frequency, 69233 rows are not matched

In [None]:
# read US cities from CSV file
cities_df = spark\
  .read\
  .option("header", "true")\
  .option("escape", '"')\
  .option("multiline", "true")\
  .csv(f"{proj_root}/data/uscities.csv")

# some cities occur many times in our dataset but are missing in CSV file
# high frequency cities are manually added here
more_cities = [\
  ("Van Nuys","Van Nuys","CA","California"), \
  ("Wexford","Wexford","PA","Pennsylvania"), \
  ("Clinton Township","Clinton Township","MI","Michigan"), \
  ("North Hollywood","North Hollywood","CA","California"), \
  ("Freehold","Freehold","NJ","New Jersey"), \
  ("Toms River","Toms River","NJ","New Jersey"), \
  ("Westborough","Westborough","MA","Massachusetts"), \
  ("Egg Harbor Township","Egg Harbor Township","NJ","New Jersey"), \
  ("Braintree","Braintree,","MA","Massachusetts"), \
  ("Long Island City","Long Island City","NY","New York"), \
  ("Maple Shade","Maple Shade","NJ","New Jersey"), \
  ("East Hartford","East Hartford","CT","Connecticut"), \
  # ("","","",""),\
  ("Orchard Park","Orchard Park","NY","New York"),\
  ("City of Industry","City of Industry","CA","California"),\
  ("Lynnfield","Lynnfield","MA","Massachusetts"),\
  ("Riverhead","Riverhead","NY","New York"),\
  ("Sheffield Village","Sheffield Village","OH","Ohio"),\
  ("Ft Myers","Ft Myers","FL","Florida"),\
  ("New Hudson","New Hudson","NY","New York"),\
  ("Mt Pleasant","Mt Pleasant","MI","Michigan"),\
  ("Old Bridge","Old Bridge","NJ","New Jersey") \
  
]

more_cities_cols = ["city","city_ascii","state_id","state_name"]
more_cities_df = spark.createDataFrame(data = more_cities, schema = more_cities_cols)

In [None]:
cities_df_small = cities_df\
  .transform(clean_city, "city")\
  .transform(clean_city, "city_ascii")\
  .withColumnRenamed("city_ascii","us_city").select(F.col("us_city"), F.col("state_id"), F.col("state_name"))

In [None]:
more_cities_df = more_cities_df\
  .transform(clean_city, "city")\
  .transform(clean_city, "city_ascii")\
  .withColumnRenamed("city_ascii","us_city").select(F.col("us_city"), F.col("state_id"), F.col("state_name"))

In [None]:
cities_df_small = cities_df_small.union(more_cities_df)

In [None]:
df_dropped_clean = df_dropped_clean.join(cities_df_small, df_dropped_clean.city == cities_df_small.us_city, "left")

In [None]:
# show cities that can't be mapped to state and their frequency
df_dropped_clean.filter(F.col("us_city").isNull()).groupBy("city").count().sort(F.col("count").desc()).show(10000)

In [None]:
# show total frequency of cities can't be mapped to a state
df_dropped_clean.filter(F.col("us_city").isNull()).count()

## Calculate population mean

In [None]:
categorical_cols = [
    'body_type'
    ,"model_name"
    ,'transmission_display'
    ,'listing_color'
    ,'state_id'
]

filled_df = df_dropped_clean.fillna(value="N/A",subset=categorical_cols)

### Mileage

In [None]:
# find mean mileage by state
mean_mileage_df = filled_df.fillna(value="N/A",subset=["state_id"])\
  .groupBy("state_id")\
  .agg(F.mean("mileage").alias("mean_mileage"))

In [None]:
mean_mileage_df = mean_mileage_df.withColumnRenamed("state_id", "state_id2")

In [None]:
mean_mileage_df.show()

+---------+------------------+
|state_id2|      mean_mileage|
+---------+------------------+
|       SC|32459.246120266624|
|       AZ| 28975.27989411671|
|       LA| 29060.23278745129|
|       MN|32471.746808158947|
|       NJ|31098.997420856464|
|       OR| 33203.41029937883|
|       VA| 33306.28334783215|
|       RI| 31150.40917582007|
|       WY| 31504.22939612609|
|       KY| 32896.95094862053|
|       NH|28934.038360811115|
|       MI|31939.325683225525|
|       NV|28500.475052274858|
|       WI| 33191.92937776032|
|       ID| 33629.86485625185|
|       CA| 28474.55071592587|
|       NE| 32452.06294167798|
|       CT|28802.197696177307|
|       MT|  33971.3351571391|
|       NC| 32134.62107143381|
+---------+------------------+
only showing top 20 rows



In [None]:
mean_mileage_df.write.mode('overwrite').parquet(f"{proj_root}/data/mean_mileage_by_state.parquet")

### Horse power

In [None]:
# find mean mileage by state
mean_hp_df = filled_df.groupBy(*["model_name", "make_name"])\
  .agg(F.mean("horsepower").alias("mean_horsepower"),
       F.mean("engine_cylinders").alias("mean_engine_cylinders"),
       F.mean("fuel_tank_volume").alias("mean_fuel_tank_volume"),
       F.mean("back_legroom").alias("mean_back_legroom"),
       F.mean("height").alias("mean_height"),
       F.mean("width").alias("mean_width"),
       F.mean("length").alias("mean_length"),
       F.mean("city_fuel_economy").alias("mean_city_fuel_economy"),
       F.mean("highway_fuel_economy").alias("mean_highway_fuel_economy"))\
  .withColumnRenamed("model_name", "model_name2")\
  .withColumnRenamed("make_name", "make_name2")

In [None]:
mean_hp_df.show()

+---------------+-----------+------------------+---------------------+---------------------+------------------+------------------+-----------------+------------------+----------------------+-------------------------+
|    model_name2| make_name2|   mean_horsepower|mean_engine_cylinders|mean_fuel_tank_volume| mean_back_legroom|       mean_height|       mean_width|       mean_length|mean_city_fuel_economy|mean_highway_fuel_economy|
+---------------+-----------+------------------+---------------------+---------------------+------------------+------------------+-----------------+------------------+----------------------+-------------------------+
|          rogue|     nissan|             170.0|                  4.0|   14.566795183826178|37.775951801398385| 67.59926754417475|72.32843373175253| 184.2828075325594|     25.29695650961029|        32.19047154108666|
|             is|      lexus|  250.555221487272|   4.8865525182737555|   17.381741657480056|31.958352540334996|56.264665798171386|79

In [None]:
mean_hp_df.write.mode('overwrite').parquet(f"{proj_root}/data/mean_hp_by_model.parquet")

In [None]:
# find mean mileage by state
mean_hp_df = filled_df.groupBy(*["make_name", "body_type"])\
  .agg(F.mean("horsepower").alias("mean_horsepower"),
       F.mean("engine_cylinders").alias("mean_engine_cylinders"),
       F.mean("fuel_tank_volume").alias("mean_fuel_tank_volume"))\
  .withColumnRenamed("make_name", "make_name2")\
  .withColumnRenamed("body_type", "body_type2")\
  

In [None]:
mean_hp_df.show()

+------------+---------------+------------------+---------------------+---------------------+
|  make_name2|     body_type2|   mean_horsepower|mean_engine_cylinders|mean_fuel_tank_volume|
+------------+---------------+------------------+---------------------+---------------------+
|      jaguar|          wagon| 348.3369565217391|    5.760869565217392|   19.155434782608687|
|      nissan|          wagon|121.91451068616423|    4.002183406113537|   13.199999999999894|
|      subaru|          sedan|198.13172538325472|     4.04059712361187|    16.63680903597046|
|     bentley|    convertible| 557.9594484995945|     10.5181598062954|    23.98716707021787|
|      suzuki|          sedan|153.55954088952655|     4.13225371120108|   14.767718794834979|
|       scion|      hatchback|126.57013379369874|                  4.0|   11.828182995252575|
|         vpg|            van|              null|                 null|                 null|
|     hyundai|      hatchback|  150.706842188239|           

In [None]:
mean_hp_df.write.mode('overwrite').parquet(f"{proj_root}/data/mean_hp_by_bodytype.parquet")

## Export samples

In [None]:
# randomly down sample data for data analysis
splits = df_dropped_clean.randomSplit([0.9, 0.1], 651)

In [None]:
# # check num of rows in sample
splits[1].count()

In [None]:
splits[1].printSchema()

In [None]:
splits[1].write.mode('overwrite').parquet(f"{proj_root}/data/sample0.1.parquet")
# splits[1].write.mode('overwrite').parquet(f"{proj_root}/data/sample0.002.parquet")
# splits[1].write.mode('overwrite').parquet(f"{proj_root}/data/sample0.01.parquet")

In [None]:
!sudo du -sh drive/MyDrive/CS651Proj/data/sample0.1.parquet

Check output Parquet

In [None]:
parquet_df = spark.read.parquet(f"{proj_root}/data/sample0.1.parquet")

In [None]:
parquet_df.show()

+-----------------+------------+-----------+-----------+-----------------+------------+----------+----------------+-------------------+-----------+--------------+-----+-------------+----------------+--------------+-------------+----------------+---------+-------------+------+--------------------+----------+--------------+-----+------+------+---------+--------+------+-----------+-------------+----------+---------+-----------------+---------+---------------+-------+----------------+-----------+-----+--------+-------+--------------+-----------------+------+--------------------+-----------+------+------------+--------------------+------+--------------------+------------+--------------------+---------+-----+----+-------+-----------+--------+--------------+
|              vin|back_legroom|  body_type|       city|city_fuel_economy|daysonmarket|dealer_zip|engine_cylinders|engine_displacement|engine_type|exterior_color|fleet|frame_damaged|franchise_dealer|franchise_make|front_legroom|fuel_tank_