# Preprocessing Stage

### Some of the code used in this notebook make reference to the Spark documentation
https://spark.apache.org/docs/latest/api/python/index.html

### and Akira's tutorial

https://github.com/akiratwang/MAST30034_Python/blob/main/advanced_tutorials/Spark%20Tutorial.ipynb

In [3]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)


In [4]:
# make schema

columns = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 
        'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 
        'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
        'improvement_surcharge', 'total_amount', 'congestion_surcharge']
ints = ('VendorID', 'passenger_count', 'RatecodeID', 'PULocationID', 'DOLocationID', 
        'payment_type')
doubles = ('trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
           'improvement_surcharge', 'total_amount', 'congestion_surcharge')
strings = ('store_and_fwd_flag',)
dtimes = ('tpep_pickup_datetime', 'tpep_dropoff_datetime')
dtypes = {column: IntegerType() for column in ints}
dtypes.update({column: DoubleType() for column in doubles})
dtypes.update({column: StringType() for column in strings})
dtypes.update({column: TimestampType() for column in dtimes})

schema = StructType()


for column in columns:
    schema.add(column, 
               dtypes[column],
               True )


sdf = spark.read.csv("../raw_data/2019", header=True, schema=schema) \
        .withColumnRenamed("RatecodeID","RateCodeID") \
        .withColumnRenamed('tpep_pickup_datetime', 'pickup_time') \
        .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_time')

sdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_time: timestamp (nullable = true)
 |-- dropoff_time: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
f"{sdf.count():,} rows"




'84,399,019 rows'

In [6]:
sdf = sdf.withColumn("Month", F.month("pickup_time")) \
    .withColumn("hour", F.hour("pickup_time")) \
    .withColumn("dayofmonth", F.dayofmonth("pickup_time")) \
    .withColumn("weekday", F.dayofweek("pickup_time")) \
    .withColumn("duration", F.unix_timestamp("dropoff_time") - F.unix_timestamp("pickup_time"))


sdf = sdf.drop("pickup_time", "dropoff_time", "store_and_fwd_flag", "VendorID")

# only look at trips paid with cash or card, remove "dispute","voided trips", etc.
sdf = sdf.filter((sdf.payment_type == 1) | (sdf.payment_type == 2))
# according to the fare directory
sdf = sdf.filter(sdf.fare_amount > 2.5)
# according to the lookup csv zone 264 and 265 are unknown zones
sdf = sdf.filter((sdf.PULocationID != 264) | (sdf.DOLocationID != 264))
sdf = sdf.filter((sdf.PULocationID != 265) | (sdf.DOLocationID != 265))


In [7]:
## Try to find any missing values
# sdf.where(col("total_amount").isNull())
# sdf.where(col("trip_distance").isNull())
## didn't find any missing values in fact

### Aggregation starts from here
### The results need for future will be saved into parquet file

In [8]:
from pyspark.sql.functions import sum, mean
# aggregat the results needed for future analysis
pickup_results = sdf.groupBy('PULocationID','Month', 'weekday', 'hour') \
        .agg(sum("fare_amount").alias("sum_fare_amount"),
             mean("fare_amount").alias("avg_fare_amount"),
             sum("trip_distance").alias("total_trip_distance"),
             sum("duration").alias("total_duration"))
# pickup_results

In [9]:
# save the results to the disk as a parquet file
from shutil import rmtree
from os import path

fpath = '../data/aggregated_results1.parquet/'
if path.exists(fpath):
    rmtree(fpath)
pickup_results.write.format('parquet').save('../data/aggregated_results1.parquet')

21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 75.08% for 9 writers
21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 67.58% for 10 writers
21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 61.43% for 11 writers
21/08/17 02:11:41 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 56.31% for 12 writers
21/08/17 02:11:42 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,

In [None]:
byhour_results = sdf.groupBy('PULocationID','DOLocationID', 'hour').count()
fpath = '../data/aggregated_results2.parquet/'
if path.exists(fpath):
    rmtree(fpath)
byhour_results.write.format('parquet').save(fpath)

In [1]:
results_for_temp = sdf.groupBy('PULocationID','Month','dayofmonth') \
            .agg(sum("fare_amount").alias("sum_fare_amount"),
                 mean("fare_amount").alias("avg_fare_amount"),
                 sum("trip_distance").alias("total_trip_distance"),
                 sum("duration").alias("total_duration"))
fpath = '../data/aggregated_results3.parquet/'
if path.exists(fpath):
    rmtree(fpath)
results_for_temp.write.format('parquet').save(fpath)