In [3]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("MAST30034 Project 1 - Preprocessing 2")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config('spark.driver.memory', '4g')
    .config('spark.executor.memory', '2g')
    .getOrCreate()
)

your 131072x1 screen size is bogus. expect trouble
24/08/24 16:40:02 WARN Utils: Your hostname, smacked resolves to a loopback address: 127.0.1.1; using 172.31.130.26 instead (on interface eth0)
24/08/24 16:40:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 16:40:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sdf = spark.read.parquet('../data/tlc_data/raw/*')

In [7]:
sdf.count()

39837665

In [8]:
# Calculate trip duration in minutes
sdf = sdf.withColumn('trip_duration', 
                     (unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime')) / 60)

# Convert fare_amount and any other columns from strings to numeric (removing commas first)
sdf = sdf.withColumn('fare_amount', regexp_replace(col('fare_amount'), ',', '').cast('float'))

In [9]:
# Remove rows with any null values
sdf = sdf.dropna()

# Filter for data in the year 2023 and 2024
sdf = sdf.filter(year(col('tpep_pickup_datetime')).isin([2023, 2024]))

# Filter for data in the months 1-6
sdf = sdf.filter(month(col('tpep_pickup_datetime')).between(1, 6))

# Filter for VendorID 1 and 2
sdf = sdf.filter(col('VendorID').isin(1, 2))

# Filter for ratecodeID 1 and 2
sdf = sdf.filter(col('RatecodeID').isin(1, 2))

# Ensure the fare amount starts at $2.50 (standard taxi fare)
sdf = sdf.filter(col('fare_amount') >= 2.50)

# Filter for trip distances greater than zero miles
sdf = sdf.filter(col('trip_distance') > 0)

# Filter for trip durations greater than one minute
sdf = sdf.filter(col('trip_duration') > 1)

# Filter for payment types 1
sdf = sdf.filter(col('payment_type') == 1)

# Filter for Passenger Count 1-6
sdf = sdf.filter(col('passenger_count').between(1, 6))

# Filter for Pickup locations within 1-263
sdf = sdf.filter(col('PULocationID').between(1, 263))

# Filter for Dropoff locations within 1-263
sdf = sdf.filter(col('DOLocationID').between(1, 263))

# Remove instances where pick-up and drop-off times were the same
sdf = sdf.filter(col('tpep_pickup_datetime') != col('tpep_dropoff_datetime'))

# Remove instances of pickup in an airport location
sdf = sdf.filter(~col('pulocationid').isin(132, 138))

In [10]:
num_records = sdf.count()
print(f"Number of records: {num_records}")



Number of records: 26650931


                                                                                

In [11]:
# Finds the lower and upper bounds for each column in the dataframe
def iqr_bounds(columns, N):
    quantiles = sdf.approxQuantile(columns, [0.25, 0.75], 0.1)

    bounds = {}

    for column, (q1, q3) in zip(columns, quantiles):
        iqr = q3 - q1
    
        if N <= 100:
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
        else:
            lower_bound = q1 - (N**0.5 - 0.5) * iqr
            upper_bound = q3 + (N**0.5 - 0.5) * iqr

        bounds[column] = (lower_bound, upper_bound)
    
    return bounds

In [12]:
# Detect and remove outliers from the data
columns_to_check = ['total_amount', 'trip_distance', 'trip_duration']

bounds = iqr_bounds(columns_to_check, num_records)

for column, (lower_bound, upper_bound) in bounds.items():
    sdf = sdf.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

                                                                                

{'total_amount': (1.8300000000000036, 39.51),
 'trip_distance': (-1.2499999999999998, 4.83),
 'trip_duration': (-8.708333333333334, 33.09166666666667)}

In [14]:
num_records = sdf.count()
print(f"Number of records: {num_records}")



Number of records: 23742056


                                                                                

In [15]:
# Add a new column 'date' that extracts the date from the 'tpep_pickup_datetime' column
# Add a new column 'hour' that extracts the hour from the 'tpep_pickup_datetime' column
# Add a new column 'month' that extracts the month from the 'tpep_pickup_datetime' column
sdf = sdf.withColumn('date', to_date(sdf.tpep_pickup_datetime, 'yyyy-MM-dd HH:mm:ss')) \
         .withColumn('month', month(sdf.tpep_pickup_datetime)) \
         .withColumn('hour', hour(sdf.tpep_pickup_datetime))

# Add a new column 'day_of_week' that extracts the day of the week from the 'date' column
sdf = sdf.withColumn('day_of_week', dayofweek(sdf['date']))

# Extract the day of the month from the 'tpep_pickup_datetime' column
sdf = sdf.withColumn('date_of_month', dayofmonth(sdf.tpep_pickup_datetime))

# Add a new column 'profitability' that calculates the profitability of the trip (total_amount / trip_duration)
sdf = sdf.withColumn('profitability', (col('fare_amount') + col('extra') + col('tip_amount')) / col('trip_duration'))

In [16]:
# Load the weather data into a PySpark DataFrame
weather_sdf = spark.read.csv('../data/weather_data/curated/New_York.csv', header=True, inferSchema=True)

# Perform the join on 'date' and 'hour' columns
sdf = sdf.join(weather_sdf, on=['date', 'hour'], how='left')

In [17]:
sdf.describe().show()

24/08/24 16:40:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+------------------+----------------+------------------+------------------+--------------------+------------------+------------------+------------------+--------------------+-----------------+------------------+--------------------+------------------+--------------------+---------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+-----------------+-------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|summary|              hour|        vendorid|   passenger_count|     trip_distance|          ratecodeid|store_and_fwd_flag|      pulocationid|      dolocationid|        payment_type|      fare_amount|             extra|             mta_tax|        tip_amount|        tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|         airport_fee|     trip_duration|             month|       day_of_week|    date_

                                                                                

In [18]:
# Separate the data into two years, one for 2023 and one for 2024. 2023 is training, 2024 is testing
year1_sdf = sdf.filter(year(col('tpep_pickup_datetime')) == 2023)
year2_sdf = sdf.filter(year(col('tpep_pickup_datetime')) == 2024)

In [19]:
# List of columns to drop
columns_to_drop = [
    'date',
    'vendorid',
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'ratecodeid',
    'store_and_fwd_flag',
    'dolocationid',
    'payment_type',
    'mta_tax',
    'tolls_amount',
    'improvement_surcharge',
    'congestion_surcharge',
    'airport_fee',
]

# Drop the columns
year1_sdf = sdf.drop(*columns_to_drop)
year2_sdf = sdf.drop(*columns_to_drop)

In [20]:
# List of columns rearranged
reordered_columns = [
    # Date and Time Columns
    'month', 'date_of_month', 'hour', 'day_of_week', 
    
    # Monetary Columns
    'fare_amount', 'extra', 'tip_amount', 'total_amount', 'profitability', 'trip_duration', 'trip_distance', 'passenger_count', 'pulocationid',
    
    # Weather Columns
    'temp', 'dwpt', 'rhum', 'prcp', 'wspd', 'pres'
]

# Select the columns in the desired order
year1_sdf = year1_sdf.select(reordered_columns)
year2_sdf = year2_sdf.select(reordered_columns)

# Show the DataFrame to verify
year1_sdf.show(5)

+-----+-------------+----+-----------+-----------+-----+----------+------------+------------------+------------------+-------------+---------------+------------+----+----+-----+----+----+------+
|month|date_of_month|hour|day_of_week|fare_amount|extra|tip_amount|total_amount|     profitability|     trip_duration|trip_distance|passenger_count|pulocationid|temp|dwpt| rhum|prcp|wspd|  pres|
+-----+-------------+----+-----------+-----------+-----+----------+------------+------------------+------------------+-------------+---------------+------------+----+----+-----+----+----+------+
|    1|            1|   0|          1|        7.9|  1.0|       4.0|        16.9|2.0422163739367964| 6.316666666666666|          1.1|              1|          43| 9.9| 9.9|100.0| 1.0| 7.6|1011.0|
|    1|            1|   0|          1|       14.9|  1.0|      15.0|        34.9|2.4235293818455115|             12.75|         2.51|              1|          48| 9.9| 9.9|100.0| 1.0| 7.6|1011.0|
|    1|            1|   0

In [21]:
year1_sdf \
        .repartition(10) \
        .write \
        .mode('overwrite') \
        .parquet('../data/tlc_data/curated/2023_tlc.parquet')

                                                                                

In [22]:
year2_sdf \
        .repartition(10) \
        .write \
        .mode('overwrite') \
        .parquet('../data/tlc_data/curated/2024_tlc.parquet')

                                                                                

In [23]:
sample = year1_sdf.sample(fraction=0.5)

In [24]:
sample \
        .repartition(10) \
        .write \
        .mode('overwrite') \
        .parquet('../data/tlc_data/curated/2023_tlc_sample.parquet')

                                                                                