# Project Big Data Analytics

## Setup

### Install pip packages and the java development kit.

In [None]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet 
!apt install openjdk-8-jdk-headless &> /dev/null

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


### Install ngrok which will allow us to place our local spark ui on the web.

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')

### Set the java environmental variable.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

### Connect to a SparkSession, setting the spark ui port to `4050`.

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().set('spark.ui.port', '4050').setAppName("Projeto_Big_Data_Analytics").setMaster("local[6]")
sc = SparkSession.builder.config(conf=conf).getOrCreate()

## Project

### Get data
The data was extracted from Kaggle from the following link [Kaggle](https://www.kaggle.com/datasets/ananaymital/us-used-cars-dataset), and the dataset is about used cars in the US.  
In this first part data was extracted from kaggle via API using the library kaggle, __note that a Json file with an API token will be asked__.

The following information was copied from the __About Dataset__ section of the dataset in Kaggle.  

__Context__  
The dataset contains 3 million real world used cars details.

__Content__  
This data was obtained by running a self made crawler on Cargurus inventory in September 2020.

__Acknowledgements__  
This data is for academic, research and individual experimentation only and is not intended for commercial purposes.

__Inspiration__  
Imagine a web app that can estimate the listing price of a vehicle. What features of the vehicle should be used to build a price prediction regression model?


In [None]:
from pyspark.sql.functions import col, when, split
from pyspark.sql.functions import monotonically_increasing_id
from google.colab import files
import zipfile
# some libraries needed

In [None]:
!pip install kaggle

# Upload your Kaggle API token (kaggle.json)
# Go to your Kaggle account settings page, click on "Create New API Token",
# and upload the downloaded kaggle.json file here.
# Here anyone that runs this file should follow these instructions and create their API token on kaggle 
files.upload()

# line of code to protect an API token the 600 makes it only readable by the owner of the notebook
# if anything it can be ran and then on kaggle generate a new API token, the last one will cease to exist
!chmod 600 /content/kaggle.json

# Set the Kaggle API token path
import os
os.environ['KAGGLE_CONFIG_DIR'] = '/content'

# Download the specific file from the dataset using the Kaggle API
!kaggle datasets download -d ananaymital/us-used-cars-dataset -f used_cars_data.csv

In [None]:
# Just to check if the API token is only readable by the owner of the notebook
# If it is the output should be -rw------- or -rw-r--r--
!ls -l /content/kaggle.json

-rw------- 1 root root 67 Jun 12 12:22 /content/kaggle.json


In [None]:
# Extract the CSV file from the zip archive
with zipfile.ZipFile('used_cars_data.csv.zip', 'r') as zip_ref:
    zip_ref.extractall()

In [None]:
# Load the extracted CSV file into a DataFrame
df = sc.read.format("csv").option("inferSchema", "true").option("header", "true").load("used_cars_data.csv")
df.printSchema()

root
 |-- vin: string (nullable = true)
 |-- back_legroom: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bed_height: string (nullable = true)
 |-- bed_length: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_fuel_economy: string (nullable = true)
 |-- combine_fuel_economy: string (nullable = true)
 |-- daysonmarket: string (nullable = true)
 |-- dealer_zip: string (nullable = true)
 |-- description: string (nullable = true)
 |-- engine_cylinders: string (nullable = true)
 |-- engine_displacement: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- fleet: string (nullable = true)
 |-- frame_damaged: string (nullable = true)
 |-- franchise_dealer: string (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: string (nullable = true)
 |-- fuel_tank_volume: string (nullable = tr

## Data transformation
In this section some transformations will be done to the dataset.  
The dataset started with 3 million lines and 66 columns.  
Some columns were dropped, some others were transformed or divided into multiple ones.  
Null values were checked and misplaced data was also delt with.

In [None]:
# Drop the columns
# Here we ared dropping most columns, ideally some of  this columns could be used because they are in a type 'number + string'
# but most os which we dont need
# even the ones in boolean have a lot of null values, and given the nature of the exercize we want to maximize the number of lines
# in the data set
columns_to_drop = ['vin', 'back_legroom', 'bed', 'bed_height', 'bed_length', 'cabin', 'city',
                   'city_fuel_economy', 'combine_fuel_economy', 'dealer_zip', 'description',
                   'engine_cylinders', 'exterior_color', 'franchise_make', 'front_legroom', 
                   'fuel_tank_volume', 'fuel_type', 'height', 'highway_fuel_economy',
                   'interior_color', 'is_certified', 'is_cpo', 'is_oemcpo',
                   'latitude', 'length', 'listed_date', 'listing_color', 'listing_id',
                   'longitude', 'main_picture_url', 'major_options', 'make_name', 
                   'maximum_seating', 'model_name', 'owner_count', 'power',
                   'salvage', 'savings_amount', 'seller_rating', 'sp_id', 'sp_name',
                   'theft_title', 'torque', 'transmission', 'transmission_display',
                   'trimId', 'trim_name', 'vehicle_damage_category', 'wheel_system',
                   'fleet', 'frame_damaged', 'has_accidents', 'isCab', 'wheelbase', 'width'] 
                   
df = df.drop(*columns_to_drop)

#print the remaining columns
column_names = df.columns
for column in column_names:
    print(column)

# df.show()

body_type
daysonmarket
engine_displacement
engine_type
franchise_dealer
horsepower
is_new
mileage
price
wheel_system_display
year


In [None]:
# Drop rows with null values
df = df.dropna()

# Count the number of rows
row_count = df.count()
print(row_count)

2648287


In [None]:
# Convert boolean columns to 0s and 1s
boolean_columns = ['franchise_dealer', 'is_new']  # Specify the boolean columns

for column in boolean_columns:
    df = df.withColumn(column, when(col(column).cast("boolean"), 1).otherwise(0))

# Show the updated DataFrame
df.show()

+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+
|      body_type|daysonmarket|engine_displacement|engine_type|franchise_dealer|horsepower|is_new|mileage|  price|wheel_system_display|year|
+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+
|SUV / Crossover|         522|             1300.0|         I4|               1|     177.0|     1|    7.0|23141.0|   Front-Wheel Drive|2019|
|SUV / Crossover|         207|             2000.0|         I4|               1|     246.0|     1|    8.0|46500.0|     All-Wheel Drive|2020|
|SUV / Crossover|         196|             3000.0|         V6|               1|     340.0|     1|   11.0|67430.0|     All-Wheel Drive|2020|
|SUV / Crossover|         137|             2000.0|         I4|               1|     246.0|     1|    7.0|48880.0|     All-Wheel Drive|2020|
|SUV / Crossover|   

In [None]:
# Add a unique ID column
df = df.withColumn("row_id", monotonically_increasing_id())

# Order by the unique ID column in descending order
last_lines = df.orderBy(col("row_id").desc()).limit(20).collect()

# Print the last lines
for row in last_lines:
    print(row)

Row(body_type='SUV / Crossover', daysonmarket='17', engine_displacement='2500.0', engine_type='I4', franchise_dealer=1, horsepower='170.0', is_new=0, mileage='22600.0', price='19900.0', wheel_system_display='Front-Wheel Drive', year='2017', row_id=635655174168)
Row(body_type='Sedan', daysonmarket='11', engine_displacement='2000.0', engine_type='I4 Diesel', franchise_dealer=0, horsepower='180.0', is_new=0, mileage='27857.0', price='26998.0', wheel_system_display='All-Wheel Drive', year='2017', row_id=635655174167)
Row(body_type='SUV / Crossover', daysonmarket='171', engine_displacement='Premium Cloth Seat Trim', engine_type='Radio: Chevrolet Infotainment System AM/FM Stereo', franchise_dealer=0, horsepower='ABS brakes', is_new=0, mileage='Illuminated entry', price='Overhead airbag', wheel_system_display='Remote keyless entry', year='Speed-sensing steering', row_id=635655174166)
Row(body_type='SUV / Crossover', daysonmarket='16', engine_displacement='1500.0', engine_type='I4', franchise_

In [None]:
# Define the columns to check for numeric values
# we have checked that there is information misplaced in the file so we are cleaning it

numeric_columns = ['daysonmarket', 'engine_displacement', 'franchise_dealer',
                   'horsepower', 'is_new', 'mileage', 'price', 'year']

# Filter out rows where the values in numeric columns are not numbers
for column in numeric_columns:
    df = df.where(col(column).cast('double').isNotNull())

# Show the resulting DataFrame
df.show()

# Count the number of rows
row_count = df.count()
print(row_count)

+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+------+
|      body_type|daysonmarket|engine_displacement|engine_type|franchise_dealer|horsepower|is_new|mileage|  price|wheel_system_display|year|row_id|
+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+------+
|SUV / Crossover|         522|             1300.0|         I4|               1|     177.0|     1|    7.0|23141.0|   Front-Wheel Drive|2019|     0|
|SUV / Crossover|         207|             2000.0|         I4|               1|     246.0|     1|    8.0|46500.0|     All-Wheel Drive|2020|     1|
|SUV / Crossover|         196|             3000.0|         V6|               1|     340.0|     1|   11.0|67430.0|     All-Wheel Drive|2020|     2|
|SUV / Crossover|         137|             2000.0|         I4|               1|     246.0|     1|    7.0|48880.0|     

In [None]:
#print unique values for columns we want to transform to 0s and 1s
columns = ['body_type', 'engine_type', 'wheel_system_display']

for column in columns:
    unique_values = df.select(column).distinct().orderBy(column).rdd.flatMap(lambda x: x).collect()
    print(f"Unique values for column '{column}':")
    for value in unique_values:
        print(value)
    print()

Unique values for column 'body_type':
Convertible
Coupe
Hatchback
Minivan
Pickup Truck
SUV / Crossover
Sedan
Van
Wagon

Unique values for column 'engine_type':
H4
H4 Hybrid
H6
I2
I3
I3 Hybrid
I4
I4 Compressed Natural Gas
I4 Diesel
I4 Flex Fuel Vehicle
I4 Hybrid
I5
I5 Biodiesel
I5 Diesel
I6
I6 Diesel
I6 Hybrid
R2
V10
V10 Diesel
V12
V12 Hybrid
V6
V6 Biodiesel
V6 Compressed Natural Gas
V6 Diesel
V6 Flex Fuel Vehicle
V6 Hybrid
V8
V8 Biodiesel
V8 Compressed Natural Gas
V8 Diesel
V8 Flex Fuel Vehicle
V8 Hybrid
V8 Propane
W12
W12 Flex Fuel Vehicle
W16
W8

Unique values for column 'wheel_system_display':
4X2
All-Wheel Drive
Four-Wheel Drive
Front-Wheel Drive
Rear-Wheel Drive



In [None]:
# Select the column with the values
# done because we only want to know if the engine is v6, v8... etc
# the rest of he information is not related to the engine type but to the fuel type
column_name = 'engine_type'

# Apply the transformation to extract the desired part
df = df.withColumn(column_name, split(col(column_name), ' ')[0])

# Show the updated DataFrame
df.show()

+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+------+
|      body_type|daysonmarket|engine_displacement|engine_type|franchise_dealer|horsepower|is_new|mileage|  price|wheel_system_display|year|row_id|
+---------------+------------+-------------------+-----------+----------------+----------+------+-------+-------+--------------------+----+------+
|SUV / Crossover|         522|             1300.0|         I4|               1|     177.0|     1|    7.0|23141.0|   Front-Wheel Drive|2019|     0|
|SUV / Crossover|         207|             2000.0|         I4|               1|     246.0|     1|    8.0|46500.0|     All-Wheel Drive|2020|     1|
|SUV / Crossover|         196|             3000.0|         V6|               1|     340.0|     1|   11.0|67430.0|     All-Wheel Drive|2020|     2|
|SUV / Crossover|         137|             2000.0|         I4|               1|     246.0|     1|    7.0|48880.0|     

In [None]:
#print unique values for columns we want to transform to 0s and 1s
# check if it was done correctly

columns = ['body_type', 'engine_type', 'wheel_system_display']

for column in columns:
    unique_values = df.select(column).distinct().orderBy(column).rdd.flatMap(lambda x: x).collect()
    print(f"Unique values for column '{column}':")
    for value in unique_values:
        print(value)
    print()

Unique values for column 'body_type':
Convertible
Coupe
Hatchback
Minivan
Pickup Truck
SUV / Crossover
Sedan
Van
Wagon

Unique values for column 'engine_type':
H4
H6
I2
I3
I4
I5
I6
R2
V10
V12
V6
V8
W12
W16
W8

Unique values for column 'wheel_system_display':
4X2
All-Wheel Drive
Four-Wheel Drive
Front-Wheel Drive
Rear-Wheel Drive



In [None]:
# Split columns into binary variables
columns_to_split = ['body_type', 'engine_type', 'wheel_system_display']

for column in columns_to_split:
    # Get unique values in the column
    unique_values = df.select(column).distinct().rdd.flatMap(lambda x: x).collect()

    # Create binary variables for each unique value
    for value in unique_values:
        binary_column_name = f'{column}_{value}'
        df = df.withColumn(binary_column_name, (col(column) == value).cast('integer'))

    # Drop the original column
    df = df.drop(column)

# Show the resulting DataFrame
df.show()

+------------+-------------------+----------------+----------+------+-------+-------+----+------+-------------------+---------------------+---------------+----------------------+---------------+-------------+-------------------------+---------------+-----------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+--------------+---------------+--------------+--------------+---------------+--------------+--------------+---------------+-------------------------------------+------------------------+------------------------------------+--------------------------------------+-------------------------------------+
|daysonmarket|engine_displacement|franchise_dealer|horsepower|is_new|mileage|  price|year|row_id|body_type_Hatchback|body_type_Convertible|body_type_Sedan|body_type_Pickup Truck|body_type_Wagon|body_type_Van|body_type_SUV / Crossover|body_type_Coupe|body_type_Minivan|engine_type_H6|engine_type_I5|engine_type_I4|engine_type_

## RDD's
Exploration of RDD's

In [None]:
# Transform dataframe into RDD:
rdd = df.rdd

In [None]:
# Count the RDD
contagem_registos = rdd.count()
print("Número de registos:", contagem_registos)

Número de registos: 1626266


In [None]:
# Print the first 5
primeiros_registos = rdd.take(5)
print("Primeiros 5 registos:")
for registo in primeiros_registos:
    print(registo)

Primeiros 5 registos:
Row(daysonmarket='522', engine_displacement='1300.0', franchise_dealer=1, horsepower='177.0', is_new=1, mileage='7.0', price='23141.0', year='2019', row_id=0, body_type_Hatchback=0, body_type_Convertible=0, body_type_Sedan=0, body_type_Pickup Truck=0, body_type_Wagon=0, body_type_Van=0, body_type_SUV / Crossover=1, body_type_Coupe=0, body_type_Minivan=0, engine_type_H6=0, engine_type_I5=0, engine_type_I4=1, engine_type_I2=0, engine_type_R2=0, engine_type_I3=0, engine_type_W12=0, engine_type_V6=0, engine_type_V12=0, engine_type_V8=0, engine_type_H4=0, engine_type_V10=0, engine_type_I6=0, engine_type_W8=0, engine_type_W16=0, wheel_system_display_Four-Wheel Drive=0, wheel_system_display_4X2=0, wheel_system_display_All-Wheel Drive=0, wheel_system_display_Front-Wheel Drive=1, wheel_system_display_Rear-Wheel Drive=0)
Row(daysonmarket='207', engine_displacement='2000.0', franchise_dealer=1, horsepower='246.0', is_new=1, mileage='8.0', price='46500.0', year='2020', row_id

In [None]:
# create an rdd with only the column engine displacement
make_name_rdd = df.select("engine_displacement").rdd

# count distinct values
distinct_count = make_name_rdd.distinct().count()

# Print distinct values
print("Número de valores distintos: ", distinct_count)

Número de valores distintos:  67


Transformations

In [None]:
# Filter
filtered_rdd = make_name_rdd.filter(lambda make: make == "1300")

In [None]:
# Sample
rdd.sample(fraction = .1, withReplacement = True)

PythonRDD[197] at RDD at PythonRDD.scala:53

In [None]:
# Distinct
rdd.distinct()

PythonRDD[202] at RDD at PythonRDD.scala:53

## SparkSQL

In [None]:
# Register the dataframe as a temporary table
df.createOrReplaceTempView("carros")

In [None]:
# Count
contagem_registos = sc.sql("SELECT COUNT(*) FROM carros").first()[0]
print("Número de registos:", contagem_registos)

Número de registos: 1626266


In [None]:
# print the first 5 lines
primeiros_registos = sc.sql("SELECT * FROM carros LIMIT 5")
primeiros_registos.show()

+------------+-------------------+----------------+----------+------+-------+-------+----+------+-------------------+---------------------+---------------+----------------------+---------------+-------------+-------------------------+---------------+-----------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+--------------+---------------+--------------+--------------+---------------+--------------+--------------+---------------+-------------------------------------+------------------------+------------------------------------+--------------------------------------+-------------------------------------+
|daysonmarket|engine_displacement|franchise_dealer|horsepower|is_new|mileage|  price|year|row_id|body_type_Hatchback|body_type_Convertible|body_type_Sedan|body_type_Pickup Truck|body_type_Wagon|body_type_Van|body_type_SUV / Crossover|body_type_Coupe|body_type_Minivan|engine_type_H6|engine_type_I5|engine_type_I4|engine_type_

In [None]:
# calculate mean price
media_preco = sc.sql("SELECT AVG(price) FROM carros").first()[0]
print("Preço médio:", media_preco)

Preço médio: 27556.02329576281


In [None]:
# find max price
max_preco = sc.sql("SELECT MAX(price) FROM carros").first()[0]
print("Preço máximo:", max_preco)

Preço máximo: 999995.0


## Machine Learning model (Regression)
In this section a linear regression was conducted for the target label __price__

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [None]:
# Prepare the data for training
df = df.withColumn('daysonmarket', col('daysonmarket').cast('double'))
df = df.withColumn('engine_displacement', col('engine_displacement').cast('double'))
df = df.withColumn('horsepower', col('horsepower').cast('double'))
df = df.withColumn('mileage', col('mileage').cast('double'))
df = df.withColumn('year', col('year').cast('double'))
df = df.withColumn('price', col('price').cast('double'))

assembler = VectorAssembler(
    inputCols=['daysonmarket', 'engine_displacement', 'franchise_dealer',
               'horsepower', 'is_new', 'mileage', 'year', 'body_type_Hatchback',
               'body_type_Convertible', 'body_type_Sedan', 'body_type_Pickup Truck',
               'body_type_Wagon', 'body_type_Van', 'body_type_SUV / Crossover',
               'body_type_Coupe', 'body_type_Minivan', 'engine_type_H6', 'engine_type_I5',
               'engine_type_I4', 'engine_type_I2', 'engine_type_R2', 'engine_type_I3',
               'engine_type_W12', 'engine_type_V6', 'engine_type_V12', 'engine_type_V8',
               'engine_type_H4', 'engine_type_V10', 'engine_type_I6', 'engine_type_W8',
               'engine_type_W16', 'wheel_system_display_Four-Wheel Drive', 
               'wheel_system_display_4X2', 'wheel_system_display_All-Wheel Drive',
               'wheel_system_display_Front-Wheel Drive', 'wheel_system_display_Rear-Wheel Drive'],
    outputCol="features"
)
data_features = assembler.transform(df).select("features", "price")

# Split the data into training and testing sets
train_data, test_data = data_features.randomSplit([0.8, 0.2], seed=0)

# Create a Linear Regression model
lr = LinearRegression(labelCol="price")

# Train the model
model = lr.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Display predicted and actual prices
predictions.select("prediction", "price").show()

# Evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction")

rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})  
# average distance between the predicted values and the actual values.

mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
# average absolute difference between the predicted values and the actual values

r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"}) 
# proportion of the variance in the dependent variable (price) that can be
# explained by the independent variables in the model

# Print the performance indicators
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)

# 10 minutes to run

+------------------+-------+
|        prediction|  price|
+------------------+-------+
|24537.742287215544|19515.0|
|29704.543048078194|24978.0|
|21201.530313660158|18705.0|
|23062.918474148144|18805.0|
|25998.676950406632|25646.0|
|22689.724652997684|29193.0|
|22689.724652997684|35093.0|
|22689.635537959635|35093.0|
| 22706.34713664197|35302.0|
| 22706.25802160392|35492.0|
|22713.220020264038|31283.0|
|22713.220020264038|35093.0|
| 22713.13090522599|31518.0|
| 22718.90937320888|35488.0|
| 31747.30573217396|22205.0|
|22767.707443685038|29383.0|
| 22834.57887359662|31368.0|
| 22867.20003561885|34943.0|
|22879.851387223694|29108.0|
| 22930.01121845306|31354.0|
+------------------+-------+
only showing top 20 rows

Root Mean Squared Error (RMSE): 11008.264454165022
Mean Absolute Error (MAE): 5763.071965759084
R-squared (R2): 0.6587042420088287


The RMSE value of 11008 indicates the average deviation from the car's value predicted by the model.

In the case of the MAE of 5763, it indicates that on average the difference between the predicted value and the normal value is $5763.

The R2 of 0.6587 indicates that the regression model explains approximately 65.87% of the variance in the value of cars. It is a reasonable value, however there will always be room to improve this value.

The previous results demonstrate that the model has some ability to predict the value of cars. Through the RMSE and MAE values ​​we realize that it is possible to improve the model, and it would benefit if there was a reduction in error. The R2 values ​​may suggest that there may be other factors that increase the model's predictive capacity.

In the continuation of this work, it would be interesting to compare these values ​​with other alternative models. Continue investigating other features, evaluating whether there are outliers or patterns that may be influencing the model.
