In [1]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk/22.0.2/libexec/openjdk.jdk/Contents/Home'

## Importing libraries

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql import functions as F

In [2]:
# Initialize spark session
spark = (
    SparkSession.builder.appName("MAST30034 Project 1 curated data transformation")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "16g") 
    .config("spark.executor.memory", "16g")
    .config("spark.driver.maxResultSize", "2g")
    .getOrCreate()
)

24/08/16 17:06:07 WARN Utils: Your hostname, Phams-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 172.16.47.168 instead (on interface en0)
24/08/16 17:06:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/16 17:06:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/16 17:06:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/08/16 17:06:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/08/16 17:06:07 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## Mapping pickup location IDs to pick up zones based on the lookup table 

In [4]:
lookup = pd.read_csv('../data/taxi_zones/taxi_zone_lookup.csv')
lookup = lookup.dropna() # Dropping location_ids with no specific zone (outliers in lookup table)

In [5]:
boroughs = set(lookup['Borough'])

zone_mapping = {}

for borough in boroughs:
    zone_mapping[borough] = []

for i in range(len(lookup['LocationID'])):
    zone_mapping[lookup['Borough'][i]].append(lookup["LocationID"][i])
    
location_mapping = {id: zone for zone, ids in zone_mapping.items() for id in ids}

def map_location_id(location_id):
    return location_mapping.get(location_id, "Unknown") # Default to "Unknown" if location_id not found

map_location_udf = F.udf(map_location_id, StringType())

## Processing additional features selection and data filtering

1. Drop entries with PUZone or DOZone outside of NYC
2. Drop the original datetime columns since we have retrieved needed information
3. Filter payment_type to be 1, since cash tips are not recorded, and then drop the column since all records have same value

In [6]:
processed_df = spark.read.parquet('../data/raw/')

mapped_df = processed_df.withColumn("PUZone", map_location_udf(F.col("PULocationID"))) \
                        .withColumn("DOZone", map_location_udf(F.col("DOLocationID"))) \
                        .filter((F.col('PUZone') != 'Unknown') & (F.col('DOZone') != 'Unknown')) \
                        .drop('tpep_pickup_datetime') \
                        .drop('tpep_dropoff_datetime') \
                        .filter(F.col('payment_type') == 1) \
                        .drop('payment_type')

mapped_df.limit(20)

                                                                                

VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_duration_minutes,pickup_date,pickup_hour,dropoff_date,PUZone,DOZone
1,2.0,8.1,1.0,N,138,262,35.9,10.25,0.5,9.5,0.0,1.0,57.15,2.5,1.75,28.48333333333333,2023-05-01,0,2023-05-01,Queens,Manhattan
1,2.0,9.1,1.0,N,138,141,35.2,10.25,0.5,10.7,6.55,1.0,64.2,2.5,1.75,17.083333333333332,2023-05-01,0,2023-05-01,Queens,Manhattan
2,1.0,8.21,1.0,N,138,140,33.1,6.0,0.5,2.24,0.0,1.0,47.09,2.5,1.75,19.33333333333333,2023-05-01,0,2023-05-01,Queens,Manhattan
2,1.0,9.05,1.0,N,138,116,38.0,6.0,0.5,10.76,6.55,1.0,64.56,0.0,1.75,21.58333333333333,2023-05-01,0,2023-05-01,Queens,Manhattan
1,1.0,0.7,1.0,N,161,48,6.5,3.5,0.5,2.85,0.0,1.0,14.35,2.5,0.0,4.2,2023-05-01,0,2023-05-01,Manhattan,Manhattan
2,1.0,14.29,1.0,N,138,13,54.1,6.0,0.5,12.82,0.0,1.0,78.67,2.5,1.75,24.866666666666667,2023-05-01,0,2023-05-01,Queens,Manhattan
1,1.0,11.3,1.0,N,132,129,45.0,1.0,0.5,6.0,0.0,1.0,53.5,0.0,0.0,24.78333333333333,2023-05-01,0,2023-05-01,Queens,Queens
2,1.0,1.19,1.0,N,230,100,7.2,1.0,0.5,2.44,0.0,1.0,14.64,2.5,0.0,2.966666666666667,2023-05-01,0,2023-05-01,Manhattan,Manhattan
2,1.0,2.3,1.0,N,170,140,12.1,1.0,0.5,2.0,0.0,1.0,19.1,2.5,0.0,7.95,2023-05-01,0,2023-05-01,Manhattan,Manhattan
2,2.0,12.81,1.0,N,138,181,49.9,6.0,0.5,17.74,0.0,1.0,76.89,0.0,1.75,22.183333333333334,2023-05-01,0,2023-05-01,Queens,Brooklyn


## Merging processed trip records dataset with processed weather dataset

In [7]:
taxi_data = mapped_df.withColumn('pickup_date', F.to_timestamp('pickup_date'))

weather_data = spark.read.csv('../data/weather/hourly_weather.csv', header=True, inferSchema=True)
weather_data = weather_data.drop(F.col('_c0')) # Drop index column of weather data

weather_data = weather_data.withColumn('date', F.to_timestamp('date'))

# Perform the inner join operation
merged_data = taxi_data.join(
    weather_data,
    (taxi_data['pickup_date'] == weather_data['date']) & (taxi_data['pickup_hour'] == weather_data['hour']),
    how='inner'
)

Writing data in to `data/curated` directory

In [None]:
output_dir = '../data/curated/'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [8]:
merged_data.write.mode("overwrite").parquet(f"{output_dir}")

24/08/16 17:06:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/08/16 17:06:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                