In [1]:
# Adjust Python path within the notebook
import sys
project_root = '/Users/thangnguyen/Documents/GitHub/project-1-individual-knam2609'
if project_root not in sys.path:
    sys.path.insert(0, project_root)

import scripts

In [2]:
import matplotlib.pyplot as plt
import seaborn as sb
import pandas
import os

In [3]:
# Create SparkSession
spark = scripts.clean_base.create_spark_session()

24/08/28 17:05:08 WARN Utils: Your hostname, THANGs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 100.86.89.69 instead (on interface en0)
24/08/28 17:05:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/28 17:05:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/28 17:05:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Create directories for plots
scripts.download.make_directories(["../plots/"], ["uber"])

In [5]:
# Create directories for plots
scripts.download.make_directories(["../plots/uber/"], ["correlation", "histogram", "daily", "hourly"])

In [6]:
uber_files = scripts.clean_base.list_parquet_directories("../data/raw/uber/")
uber_dfs = [spark.read.parquet(file) for file in uber_files]

In [7]:
uber_files

['../data/raw/uber/2023-08.parquet',
 '../data/raw/uber/2023-11.parquet',
 '../data/raw/uber/2023-10.parquet',
 '../data/raw/uber/2023-09.parquet',
 '../data/raw/uber/2023-06.parquet',
 '../data/raw/uber/2023-07.parquet']

In [8]:
# Extract the schema from the existing DataFrame
schema = uber_dfs[0].schema

# Create an empty DataFrame using the extracted schema
unioned_df = spark.createDataFrame([], schema)

unioned_df # merge sub-sample of each datasets

dispatching_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_distance,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,total_amount,waiting_time,fare_per_miles


In [9]:
# Plotting correlation heatmap of every datasets
for df in uber_dfs:
    file_path = uber_files[uber_dfs.index(df)]
    # Extract the base name of the file
    file_name = os.path.basename(file_path)
    # Remove the file extension to get only the date part
    date_part = os.path.splitext(file_name)[0]
    print(date_part)
    scripts.manipulate_data.find_min_max_df(df, scripts.clean_high_volume.COLUMNS)
    df.count()
    df = scripts.clean_base.sampling_data(df, "dispatching_base_num", 0.05)
    df.count()
    unioned_df = unioned_df.union(df)
    scripts.plot_data.plot_correlation_heatmap(df, scripts.clean_high_volume.COLUMNS, "uber", "../plots/uber/correlation/", date_part)

2023-08




../plots/uber/correlation/2023-08.png
2023-11


24/08/28 17:05:20 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


../plots/uber/correlation/2023-11.png
2023-10




../plots/uber/correlation/2023-10.png
2023-09




../plots/uber/correlation/2023-09.png
2023-06




../plots/uber/correlation/2023-06.png
2023-07




../plots/uber/correlation/2023-07.png


In [10]:
unioned_df.count()




4067758

In [11]:
# Plotting distribution of continous columns
for col in scripts.clean_high_volume.COLUMNS:
    scripts.plot_data.plot_histogram(unioned_df, col, "uber", "../plots/uber/histogram/", 50)



0 134.16
../plots/uber/histogram/trip_distance_histogram.png




0 180.75
../plots/uber/histogram/trip_time_histogram.png




0 510.95
../plots/uber/histogram/base_passenger_fare_histogram.png




0.0 56.85
../plots/uber/histogram/tolls_histogram.png




0.0 24.07
../plots/uber/histogram/bcf_histogram.png




0.0 45.08
../plots/uber/histogram/sales_tax_histogram.png




0.0 15.5
../plots/uber/histogram/congestion_surcharge_histogram.png




0.0 15.0
../plots/uber/histogram/airport_fee_histogram.png




0.0 66.78
../plots/uber/histogram/tips_histogram.png




0.0 307.52
../plots/uber/histogram/driver_pay_histogram.png




0 535.82
../plots/uber/histogram/total_amount_histogram.png




0 85.38333333333334
../plots/uber/histogram/waiting_time_histogram.png




0 43.261904761904766
../plots/uber/histogram/fare_per_miles_histogram.png


Plotting continuous columns against time series like days and hours

In [12]:
from pyspark.sql.functions import avg, to_date, hour

# Convert timestamp to date
daily_avg_df = unioned_df.withColumn("pickup_date", to_date(unioned_df["pickup_datetime"]))


In [13]:
# Daily
for col in scripts.clean_high_volume.COLUMNS:
    # Group by date and calculate average
    average_df = daily_avg_df.groupBy("pickup_date").agg(avg(col).alias(col))
    average_df.count()
    scripts.plot_data.scatter_plot(average_df, "pickup_date", col, "uber", "../plots/uber/daily/")



../plots/uber/daily/pickup_date_vs_trip_distance.png




../plots/uber/daily/pickup_date_vs_trip_time.png




../plots/uber/daily/pickup_date_vs_base_passenger_fare.png




../plots/uber/daily/pickup_date_vs_tolls.png




../plots/uber/daily/pickup_date_vs_bcf.png




../plots/uber/daily/pickup_date_vs_sales_tax.png




../plots/uber/daily/pickup_date_vs_congestion_surcharge.png




../plots/uber/daily/pickup_date_vs_airport_fee.png




../plots/uber/daily/pickup_date_vs_tips.png




../plots/uber/daily/pickup_date_vs_driver_pay.png




../plots/uber/daily/pickup_date_vs_total_amount.png




../plots/uber/daily/pickup_date_vs_waiting_time.png




../plots/uber/daily/pickup_date_vs_fare_per_miles.png




In [14]:
# Hourly
for col in scripts.clean_high_volume.COLUMNS:
    # Extract hour from timestamp and group by it
    hourly_avg_df = df.groupBy(hour(df["pickup_datetime"]).alias("hour")).agg(avg(col).alias(col))
    scripts.plot_data.scatter_plot(hourly_avg_df, "hour", col, "uber", "../plots/uber/hourly/")



../plots/uber/hourly/hour_vs_trip_distance.png




../plots/uber/hourly/hour_vs_trip_time.png




../plots/uber/hourly/hour_vs_base_passenger_fare.png
../plots/uber/hourly/hour_vs_tolls.png
../plots/uber/hourly/hour_vs_bcf.png
../plots/uber/hourly/hour_vs_sales_tax.png
../plots/uber/hourly/hour_vs_congestion_surcharge.png
../plots/uber/hourly/hour_vs_airport_fee.png
../plots/uber/hourly/hour_vs_tips.png
../plots/uber/hourly/hour_vs_driver_pay.png
../plots/uber/hourly/hour_vs_total_amount.png
../plots/uber/hourly/hour_vs_waiting_time.png
../plots/uber/hourly/hour_vs_fare_per_miles.png


In [16]:
# Write curated dataset
scripts.clean_base.write_data(unioned_df, "../data/curated/uber/uber.parquet")

