In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    count, when, col, mean, sum, length, to_timestamp, hour, 
    dayofweek, avg, month, year, unix_timestamp, rand, isnan, isnull,
    rand, current_date, expr
)
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
spark = SparkSession.builder \
    .appName('NYC-Yellow-Cab') \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '4g') \
    .getOrCreate()
spark

In [4]:
file_size = os.path.getsize("C:/Users/sambo/Downloads/bidataproj/nyc_yellow_cabs.csv")
print(f"CSV file size: {file_size / (1024 ** 3):.2f} GB")

CSV file size: 1.40 GB


In [5]:
file_path = "C:/Users/sambo/Downloads/bidataproj/nyc_yellow_cabs.csv"

In [6]:
df = spark.read.csv(file_path, header=True, inferSchema=True)
print("CSV file loaded successfully with headers.")
df.show(5)

CSV file loaded successfully with headers.
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-09-01 00:05:51|  2024-09-01 00:45:03|              1|          9.8|         1|                 N|         138|          48|     

In [7]:
df.cache()

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]

In [8]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [9]:
import os
from google.cloud import storage

# Path to your service account key
service_account_path = "C:/Users/sambo/Downloads/boreal-fort-437820-k4-0b11a4b50131.json"

# Initialize the client with the credentials
client = storage.Client.from_service_account_json(service_account_path)

# Directory to save local CSV files
local_directory = "C:/Users/sambo/Downloads/bidataproj/hive files"

# Define your GCS bucket and file information
bucket_name = "nyc-yellow-cabs"
destination_blob_name = "nyc_yellow_cabs.csv"  # Name of the file in GCS
file_path = "C:/Users/sambo/Downloads/bidataproj/nyc_yellow_cabs.csv"  # Local file path to upload

# Access the bucket
try:
    bucket = client.bucket(bucket_name)
    print(f"Bucket {bucket_name} accessed successfully.")
except Exception as e:
    print(f"Failed to access bucket {bucket_name}: {e}")
    raise

# Upload the file
try:
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(file_path)
    print(f"Uploaded {file_path} to gs://{bucket_name}/{destination_blob_name}")
except FileNotFoundError:
    print(f"Error: The file {file_path} was not found.")
except Exception as e:
    print(f"An error occurred while uploading the file: {e}")
    raise


Bucket nyc-yellow-cabs accessed successfully.
Uploaded C:/Users/sambo/Downloads/bidataproj/nyc_yellow_cabs.csv to gs://nyc-yellow-cabs/nyc_yellow_cabs.csv


# Hive 

In [10]:
df.createOrReplaceTempView("nyc_yellow_cab_data")

## Hourly Analysis

In [11]:
# Busiest hours for yellow cab pickups based on the number of trips.
spark.sql("""
CREATE OR REPLACE TEMP VIEW peak_pickup_hours AS
SELECT 
    hour(tpep_pickup_datetime) AS pickup_hour,
    COUNT(*) AS trip_count
FROM nyc_yellow_cab_data
GROUP BY hour(tpep_pickup_datetime)
ORDER BY pickup_hour
""")

# Retrieve the result as a Spark DataFrame
peak_pickup_hours_df = spark.sql("SELECT * FROM peak_pickup_hours")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = peak_pickup_hours_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\peak_pickup_hours.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "peak_pickup_hours.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\peak_pickup_hours.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\peak_pickup_hours.csv to gs://nyc-yellow-cabs/peak_pickup_hours.csv
Displaying the result table:


Unnamed: 0,pickup_hour,trip_count
0,0,422783
1,1,273453
2,2,183138
3,3,117686
4,4,83612
5,5,88483
6,6,202120
7,7,374422
8,8,510008
9,9,557126


In [12]:
# Average fare amount for yellow cab trips across each hour of the day.
spark.sql("""
CREATE OR REPLACE TEMP VIEW avg_fare_by_hour AS
SELECT 
    hour(tpep_pickup_datetime) AS pickup_hour,
    AVG(fare_amount) AS avg_fare
FROM nyc_yellow_cab_data
GROUP BY hour(tpep_pickup_datetime)
ORDER BY pickup_hour
""")

# Retrieve the result as a Spark DataFrame
avg_fare_by_hour_df = spark.sql("SELECT * FROM avg_fare_by_hour")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = avg_fare_by_hour_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_hour.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "avg_fare_by_hour.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_hour.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_hour.csv to gs://nyc-yellow-cabs/avg_fare_by_hour.csv
Displaying the result table:


Unnamed: 0,pickup_hour,avg_fare
0,0,19.931356
1,1,18.06157
2,2,16.71446
3,3,17.405353
4,4,22.290715
5,5,26.335161
6,6,22.1002
7,7,19.654227
8,8,19.177416
9,9,18.827957


In [13]:
# Distribution of different payment types used in yellow cab trips across each hour of the day.
spark.sql("""
CREATE OR REPLACE TEMP VIEW payment_method_by_hour AS
SELECT 
    hour(tpep_pickup_datetime) AS pickup_hour,
    payment_type,
    COUNT(*) AS total_count
FROM nyc_yellow_cab_data
GROUP BY hour(tpep_pickup_datetime), payment_type
ORDER BY pickup_hour, payment_type
""")

# Retrieve the result as a Spark DataFrame
payment_method_by_hour_df = spark.sql("SELECT * FROM payment_method_by_hour")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = payment_method_by_hour_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_by_hour.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "payment_method_by_hour.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_by_hour.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_by_hour.csv to gs://nyc-yellow-cabs/payment_method_by_hour.csv
Displaying the result table:


Unnamed: 0,pickup_hour,payment_type,total_count
0,0,0,77099
1,0,1,290049
2,0,2,41807
3,0,3,3292
4,0,4,10536
...,...,...,...
115,23,0,109777
116,23,1,407844
117,23,2,59734
118,23,3,4206


In [14]:
# Total revenue generated by yellow cab trips for each hour of the day.
spark.sql("""
CREATE OR REPLACE TEMP VIEW revenue_by_hour AS
SELECT 
    hour(tpep_pickup_datetime) AS pickup_hour,
    SUM(total_amount) AS total_revenue  -- Use total_amount for revenue calculation
FROM nyc_yellow_cab_data
GROUP BY hour(tpep_pickup_datetime)
ORDER BY pickup_hour
""")

# Retrieve the result as a Spark DataFrame
revenue_by_hour_df = spark.sql("SELECT * FROM revenue_by_hour")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = revenue_by_hour_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_hour.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "revenue_by_hour.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_hour.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_hour.csv to gs://nyc-yellow-cabs/revenue_by_hour.csv
Displaying the result table:


Unnamed: 0,pickup_hour,total_revenue
0,0,11869440.0
1,1,7001747.0
2,2,4379953.0
3,3,2904673.0
4,4,2535483.0
5,5,3161663.0
6,6,6027502.0
7,7,10164410.0
8,8,13617020.0
9,9,14795450.0


## Revenue Analysis

In [15]:
# Frequency of various payment methods used, ordered by the number of trips.
spark.sql("""
CREATE OR REPLACE TEMP VIEW payment_method_frequency_ordered AS
SELECT
    CASE 
        WHEN payment_type = 0 THEN 'Unknown'
        WHEN payment_type = 1 THEN 'Credit Card'
        WHEN payment_type = 2 THEN 'Cash'
        WHEN payment_type = 3 THEN 'No Charge'
        WHEN payment_type = 4 THEN 'Dispute'
        ELSE 'Other'
    END AS payment_method,
    COUNT(*) AS trip_count
FROM nyc_yellow_cab_data
GROUP BY payment_type
ORDER BY trip_count DESC
""")

# Retrieve the result as a Spark DataFrame
payment_method_data_ordered = spark.sql("SELECT * FROM payment_method_frequency_ordered")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = payment_method_data_ordered.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_frequency_ordered.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "payment_method_frequency_ordered.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_frequency_ordered.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\payment_method_frequency_ordered.csv to gs://nyc-yellow-cabs/payment_method_frequency_ordered.csv
Displaying the result table:


Unnamed: 0,payment_method,trip_count
0,Credit Card,9956175
1,Cash,1809878
2,Unknown,1636910
3,Dispute,281520
4,No Charge,99953


In [16]:
# Average fare amount per passenger count to provide insights into fare trends across different group sizes.
spark.sql("""
CREATE OR REPLACE TEMP VIEW avg_fare_by_passenger AS
SELECT
    passenger_count,
    AVG(fare_amount) AS avg_fare_per_passenger
FROM nyc_yellow_cab_data
GROUP BY passenger_count
ORDER BY passenger_count
""")

# Retrieve the result as a Spark DataFrame
avg_fare_by_passenger_df = spark.sql("SELECT * FROM avg_fare_by_passenger")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = avg_fare_by_passenger_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_passenger.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "avg_fare_by_passenger.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_passenger.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_passenger.csv to gs://nyc-yellow-cabs/avg_fare_by_passenger.csv
Displaying the result table:


Unnamed: 0,passenger_count,avg_fare_per_passenger
0,,19.748085
1,0.0,17.326188
2,1.0,19.138006
3,2.0,22.351348
4,3.0,21.823336
5,4.0,24.680444
6,5.0,19.871004
7,6.0,18.653622
8,7.0,54.0352
9,8.0,71.252041


In [17]:
# fare amounts vary across different distance ranges, providing insights into both total and average fares.
spark.sql("""
CREATE OR REPLACE TEMP VIEW fare_amount_by_distance AS
SELECT 
    CASE
        WHEN trip_distance >= 0 AND trip_distance <= 1 THEN '0-1 miles'
        WHEN trip_distance > 1 AND trip_distance <= 2 THEN '1-2 miles'
        WHEN trip_distance > 2 AND trip_distance <= 5 THEN '2-5 miles'
        WHEN trip_distance > 5 AND trip_distance <= 10 THEN '5-10 miles'
        WHEN trip_distance > 10 AND trip_distance <= 20 THEN '10-20 miles'
        WHEN trip_distance > 20 THEN '>20 miles'
        ELSE 'Unknown'
    END AS distance_range,
    SUM(fare_amount) AS total_fare,
    AVG(fare_amount) AS avg_fare
FROM nyc_yellow_cab_data
WHERE trip_distance IS NOT NULL AND fare_amount IS NOT NULL
GROUP BY distance_range
ORDER BY 
    CASE
        WHEN distance_range = '0-1 miles' THEN 1
        WHEN distance_range = '1-2 miles' THEN 2
        WHEN distance_range = '2-5 miles' THEN 3
        WHEN distance_range = '5-10 miles' THEN 4
        WHEN distance_range = '10-20 miles' THEN 5
        WHEN distance_range = '>20 miles' THEN 6
        ELSE 7
    END
""")

# Retrieve the result as a Spark DataFrame
fare_by_distance_df = spark.sql("SELECT * FROM fare_amount_by_distance")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = fare_by_distance_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\fare_amount_by_distance.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "fare_amount_by_distance.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\fare_amount_by_distance.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\fare_amount_by_distance.csv to gs://nyc-yellow-cabs/fare_amount_by_distance.csv
Displaying the result table:


Unnamed: 0,distance_range,total_fare,avg_fare
0,0-1 miles,31432510.0,9.458217
1,1-2 miles,51671460.0,12.06037
2,2-5 miles,72656480.0,19.452722
3,5-10 miles,42775440.0,33.927057
4,10-20 miles,61049210.0,59.366293
5,>20 miles,13257060.0,86.90304


## Weekly Analysis

In [18]:
# variation of total revenue by day of the week
spark.sql("""
CREATE OR REPLACE TEMP VIEW revenue_by_day AS
SELECT
    CASE 
        WHEN dayofweek(tpep_pickup_datetime) = 1 THEN 'Sunday'
        WHEN dayofweek(tpep_pickup_datetime) = 2 THEN 'Monday'
        WHEN dayofweek(tpep_pickup_datetime) = 3 THEN 'Tuesday'
        WHEN dayofweek(tpep_pickup_datetime) = 4 THEN 'Wednesday'
        WHEN dayofweek(tpep_pickup_datetime) = 5 THEN 'Thursday'
        WHEN dayofweek(tpep_pickup_datetime) = 6 THEN 'Friday'
        WHEN dayofweek(tpep_pickup_datetime) = 7 THEN 'Saturday'
        ELSE 'Unknown'
    END AS day_of_week,
    SUM(fare_amount + tip_amount + extra + mta_tax + tolls_amount + improvement_surcharge + congestion_surcharge + Airport_fee) AS total_revenue
FROM nyc_yellow_cab_data
GROUP BY dayofweek(tpep_pickup_datetime)
ORDER BY dayofweek(tpep_pickup_datetime)
""")

# Retrieve the result as a Spark DataFrame
result_df = spark.sql("SELECT * FROM revenue_by_day")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = result_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_day.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "revenue_by_day.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_day.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\revenue_by_day.csv to gs://nyc-yellow-cabs/revenue_by_day.csv
Displaying the result table:


Unnamed: 0,day_of_week,total_revenue
0,Sunday,49963750.0
1,Monday,50169600.0
2,Tuesday,48446570.0
3,Wednesday,50235170.0
4,Thursday,56888200.0
5,Friday,51332000.0
6,Saturday,50278490.0


In [19]:
# average fare amount for yellow cab trips grouped by each day of the week to provide insights into daily fare trends.
spark.sql("""
CREATE OR REPLACE TEMP VIEW avg_fare_by_day AS
SELECT
    CASE
        WHEN dayofweek(tpep_pickup_datetime) = 1 THEN 'Sunday'
        WHEN dayofweek(tpep_pickup_datetime) = 2 THEN 'Monday'
        WHEN dayofweek(tpep_pickup_datetime) = 3 THEN 'Tuesday'
        WHEN dayofweek(tpep_pickup_datetime) = 4 THEN 'Wednesday'
        WHEN dayofweek(tpep_pickup_datetime) = 5 THEN 'Thursday'
        WHEN dayofweek(tpep_pickup_datetime) = 6 THEN 'Friday'
        WHEN dayofweek(tpep_pickup_datetime) = 7 THEN 'Saturday'
        ELSE 'Unknown'
    END AS day_of_week,
    AVG(fare_amount) AS avg_fare
FROM nyc_yellow_cab_data
GROUP BY dayofweek(tpep_pickup_datetime)
ORDER BY dayofweek(tpep_pickup_datetime)
""")

# Retrieve the result as a Spark DataFrame
avg_fare_df = spark.sql("SELECT * FROM avg_fare_by_day")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = avg_fare_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_day.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "avg_fare_by_day.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_day.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\avg_fare_by_day.csv to gs://nyc-yellow-cabs/avg_fare_by_day.csv
Displaying the result table:


Unnamed: 0,day_of_week,avg_fare
0,Sunday,20.687858
1,Monday,20.507998
2,Tuesday,19.645509
3,Wednesday,19.516563
4,Thursday,19.985984
5,Friday,19.422495
6,Saturday,18.915325


In [20]:
# number of yellow cab trips on each day of the week
spark.sql("""
CREATE OR REPLACE TEMP VIEW trips_by_day AS
SELECT
    CASE
        WHEN dayofweek(tpep_pickup_datetime) = 1 THEN 'Sunday'
        WHEN dayofweek(tpep_pickup_datetime) = 2 THEN 'Monday'
        WHEN dayofweek(tpep_pickup_datetime) = 3 THEN 'Tuesday'
        WHEN dayofweek(tpep_pickup_datetime) = 4 THEN 'Wednesday'
        WHEN dayofweek(tpep_pickup_datetime) = 5 THEN 'Thursday'
        WHEN dayofweek(tpep_pickup_datetime) = 6 THEN 'Friday'
        WHEN dayofweek(tpep_pickup_datetime) = 7 THEN 'Saturday'
        ELSE 'Unknown'
    END AS day_of_week,
    COUNT(*) AS total_trips
FROM nyc_yellow_cab_data
GROUP BY dayofweek(tpep_pickup_datetime)
ORDER BY dayofweek(tpep_pickup_datetime)
""")

# Retrieve the result as a Spark DataFrame
trips_df = spark.sql("SELECT * FROM trips_by_day")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = trips_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\trips_by_day.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "trips_by_day.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\trips_by_day.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\trips_by_day.csv to gs://nyc-yellow-cabs/trips_by_day.csv
Displaying the result table:


Unnamed: 0,day_of_week,total_trips
0,Sunday,1954374
1,Monday,1796036
2,Tuesday,1830566
3,Wednesday,1897246
4,Thursday,2135966
5,Friday,2003554
6,Saturday,2166694


In [21]:
# total tips earned on each day of the week
spark.sql("""
CREATE OR REPLACE TEMP VIEW tips_by_day AS
SELECT
    CASE
        WHEN dayofweek(tpep_pickup_datetime) = 1 THEN 'Sunday'
        WHEN dayofweek(tpep_pickup_datetime) = 2 THEN 'Monday'
        WHEN dayofweek(tpep_pickup_datetime) = 3 THEN 'Tuesday'
        WHEN dayofweek(tpep_pickup_datetime) = 4 THEN 'Wednesday'
        WHEN dayofweek(tpep_pickup_datetime) = 5 THEN 'Thursday'
        WHEN dayofweek(tpep_pickup_datetime) = 6 THEN 'Friday'
        WHEN dayofweek(tpep_pickup_datetime) = 7 THEN 'Saturday'
        ELSE 'Unknown'
    END AS day_of_week,
    SUM(tip_amount) AS total_tips
FROM nyc_yellow_cab_data
GROUP BY dayofweek(tpep_pickup_datetime)
ORDER BY dayofweek(tpep_pickup_datetime)
""")

# Retrieve the result as a Spark DataFrame
tips_df = spark.sql("SELECT * FROM tips_by_day")

# Convert the Spark DataFrame to a Pandas DataFrame
pandas_df = tips_df.toPandas()

# Define the local file path
local_csv_path = r"C:\Users\sambo\Downloads\bidataproj\hive files\tips_by_day.csv"

# Save the DataFrame as a CSV file
pandas_df.to_csv(local_csv_path, index=False)
print(f"CSV file saved locally at {local_csv_path}")

# Upload the file to GCS
service_account_path = r"C:\Users\sambo\Downloads\boreal-fort-437820-k4-0b11a4b50131.json"
client = storage.Client.from_service_account_json(service_account_path)

bucket_name = "nyc-yellow-cabs"
destination_blob_name = "tips_by_day.csv"

bucket = client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

# Upload the file to GCS
blob.upload_from_filename(local_csv_path)
print(f"Uploaded {local_csv_path} to gs://{bucket_name}/{destination_blob_name}")

# Display the result in the notebook
print("Displaying the result table:")
pandas_df


CSV file saved locally at C:\Users\sambo\Downloads\bidataproj\hive files\tips_by_day.csv
Uploaded C:\Users\sambo\Downloads\bidataproj\hive files\tips_by_day.csv to gs://nyc-yellow-cabs/tips_by_day.csv
Displaying the result table:


Unnamed: 0,day_of_week,total_tips
0,Sunday,6281209.0
1,Monday,6313445.0
2,Tuesday,6174430.0
3,Wednesday,6466256.0
4,Thursday,7314453.0
5,Friday,6457406.0
6,Saturday,6271814.0


In [22]:
spark.stop()