# Homework Week 5: Batch Processing
## Question 1: Install Spark and PySpark

In [1]:
# Initialize setup for spark and pyspark
import pandas as pd
from pyspark.sql import SparkSession
import warnings

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('HW5_Solution') \
    .getOrCreate()


In [2]:
# Execute spark.version.
spark.version

'3.3.2'

## Question 2: Yellow October 2024

In [44]:
# Read the October 2024 Yellow into a Spark Dataframe.
!wget --continue -P trip_data https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 00:02:20--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 143.204.102.123, 143.204.102.120, 143.204.102.231, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|143.204.102.123|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: 'trip_data/yellow_tripdata_2024-10.parquet'

     0K .......... .......... .......... .......... ..........  0%  237K 4m24s
    50K .......... .......... .......... .......... ..........  0%  573K 3m7s
   100K .......... .......... .......... .......... ..........  0%  414K 2m55s
   150K .......... .......... .......... .......... ..........  0%  780K 2m31s
   200K .......... .......... .......... .......... ..........  0%  631K 2m21s
   250K .......... .......... .......... .......... ..........  0%  726K 2m11s
   300K .......... .......... ...

In [3]:
#  Read all parquet files in the directory as one dataset.
# To read specific data, specify in .load('trip_data/yellow_tripdata_2024-10.parquet')
df = spark.read \
    .format("parquet") \
    .load('trip_data')

In [4]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 08:30:44|  2024-10-01 08:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [11]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [6]:
# Change all datatype long to integer
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, LongType, DoubleType, StringType, TimestampType, StructType, StructField

# Define the original schema
original_schema = StructType([
    StructField('VendorID', IntegerType(), True),
    StructField('tpep_pickup_datetime', TimestampType(), True),
    StructField('tpep_dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', LongType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', LongType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', IntegerType(), True),
    StructField('DOLocationID', IntegerType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('Airport_fee', DoubleType(), True)
])

# Load the DataFrame with the original schema
df = spark.read.schema(original_schema).parquet('trip_data')

# Function to convert LongType columns to IntegerType
def convert_long_to_int(df):
    for field in df.schema.fields:
        if isinstance(field.dataType, LongType):
            df = df.withColumn(field.name, col(field.name).cast(IntegerType()))
    return df

# Convert LongType columns to IntegerType
df_modified = convert_long_to_int(df)

# Show the schema of the modified DataFrame
df_modified.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampType(), True), StructField('tpep_dropoff_datetime', TimestampType(), True), StructField('passenger_count', IntegerType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', IntegerType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', IntegerType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [7]:
df_modified.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



### Repartition the Dataframe to 4 partitions and save it to parquet.

In [8]:
df_modified = df_modified.repartition(4)

In [13]:
df_modified.write.parquet('report/', mode="overwrite")

In [14]:
!ls -lah ./report

total 98M
drwxr-xr-x 1 AbdulHafeez 197609    0 Mar  9 17:13 .
drwxr-xr-x 1 AbdulHafeez 197609    0 Mar  9 17:14 ..
-rw-r--r-- 1 AbdulHafeez 197609    8 Mar  9 17:13 ._SUCCESS.crc
-rw-r--r-- 1 AbdulHafeez 197609 194K Mar  9 17:13 .part-00000-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet.crc
-rw-r--r-- 1 AbdulHafeez 197609 194K Mar  9 17:13 .part-00001-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet.crc
-rw-r--r-- 1 AbdulHafeez 197609 194K Mar  9 17:13 .part-00002-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet.crc
-rw-r--r-- 1 AbdulHafeez 197609 194K Mar  9 17:13 .part-00003-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet.crc
-rw-r--r-- 1 AbdulHafeez 197609    0 Mar  9 17:13 _SUCCESS
-rw-r--r-- 1 AbdulHafeez 197609  25M Mar  9 17:13 part-00000-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet
-rw-r--r-- 1 AbdulHafeez 197609  25M Mar  9 17:13 part-00001-5f785a55-b0ae-4100-91a9-c5f628f449b4-c000.snappy.parquet
-rw-r--r-- 1 AbdulHafeez 197609  2

## Question 3: Count records 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

In [15]:
yellow_df = spark.read.parquet('./report')

In [16]:
yellow_df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [19]:
yellow_df.createOrReplaceTempView('yellow_taxi_trips')

In [26]:
query = """
        SELECT COUNT(*) AS num_trips
        FROM yellow_taxi_trips
        WHERE CAST(tpep_pickup_datetime AS DATE) = '2024-10-15'
        """

yellow_df = spark.sql(query)
yellow_df.show()

+---------+
|num_trips|
+---------+
|   116396|
+---------+



## Question 4: Longest trip

What is the length of the longest trip in the dataset in hours?

In [33]:
query = """
    SELECT
        tpep_pickup_datetime AS pickup_datetime,
        tpep_dropoff_datetime AS dropoff_datetime,
        (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)) / 3600 AS trip_duration_hours
    FROM yellow_taxi_trips
    ORDER BY trip_duration_hours DESC
    """

yellow_df = spark.sql(query)
yellow_df.show()

+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropoff_datetime|trip_duration_hours|
+-------------------+-------------------+-------------------+
|2024-10-16 21:03:49|2024-10-23 15:40:53| 162.61777777777777|
|2024-10-04 02:47:25|2024-10-10 02:06:55|            143.325|
|2024-10-23 00:00:55|2024-10-28 17:46:33| 137.76055555555556|
|2024-10-18 17:53:32|2024-10-23 12:43:37| 114.83472222222223|
|2024-10-21 08:36:24|2024-10-25 02:30:18|  89.89833333333333|
|2024-10-20 21:30:52|2024-10-24 14:57:38|  89.44611111111111|
|2024-10-23 00:04:52|2024-10-25 22:22:49|  70.29916666666666|
|2024-10-13 03:32:51|2024-10-15 23:07:15|  67.57333333333334|
|2024-10-18 01:58:18|2024-10-20 20:02:18|  66.06666666666666|
|2024-10-21 22:28:21|2024-10-23 20:53:42|            46.4225|
|2024-10-21 02:58:28|2024-10-22 21:17:00|  42.30888888888889|
|2024-10-25 04:29:58|2024-10-26 18:58:25|  38.47416666666667|
|2024-10-24 07:52:02|2024-10-25 17:49:06|  33.95111111111111|
|2024-10

Hint:
When you subtract two timestamps in SQL, the result is a `TIMESTAMP` data type that represents the difference between the two dates and times. This difference is measured in seconds.

In the expression `(dropoff_datetime - pickup_datetime)`, the result is a `TIMESTAMP` that represents the duration of the trip in seconds.
Same goes to `(UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime))`.

The reason you need to divide by 3600 is that there are 3600 seconds in an hour. By dividing the result by 3600, you are effectively converting the duration from seconds to hours.

To illustrate this, consider a trip that lasts for 1 hour and 30 minutes. In seconds, this duration is:

1 hour = 3600 seconds
30 minutes = 1800 seconds
Total duration = 3600 + 1800 = 5400 seconds

If you divide this duration by 3600, you get:

5400 / 3600 = 1.5 hours

So, dividing by 3600 converts the duration from seconds to hours.

## Question 5: User Interface

Spark’s User Interface which shows the application's dashboard runs on which local port?

- [ ] 80
- [ ] 443
- [X] 4040
- [ ] 8080

## Question 6: Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark:

```bash
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
```

Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

In [36]:
## Download zone lookup data
!wget --continue -P trip_data https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-09 18:24:15--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 3.160.226.161, 3.160.226.111, 3.160.226.85, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|3.160.226.161|:443... connected.
HTTP request sent, awaiting response... 416 Requested Range Not Satisfiable

    The file is already fully retrieved; nothing to do.



In [38]:
## Create lookup_zone dataframe
lookup_zone_df = spark.read \
    .option('header', True) \
    .csv('trip_data/taxi_zone_lookup.csv')

In [40]:
lookup_zone_df.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [43]:
lookup_zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [42]:
## Create Temp View in Spark:
lookup_zone_df.createOrReplaceTempView('taxi_zone_lookup')

In [47]:
query = """
        SELECT
            COUNT(*) AS cnt,
            y.PULocationID,
            z.Borough,
            z.Zone
        FROM yellow_taxi_trips AS y
        INNER JOIN taxi_zone_lookup AS z
            ON y.PULocationID = z.LocationID
        GROUP BY
            y.PULocationID,
            z.Borough,
            z.Zone
        ORDER BY
            cnt
        LIMIT 5
        """
least_frequent_pickup_location_Zone = spark.sql(query)
least_frequent_pickup_location_Zone.show()

+---+------------+-------------+--------------------+
|cnt|PULocationID|      Borough|                Zone|
+---+------------+-------------+--------------------+
|  1|         105|    Manhattan|Governor's Island...|
|  2|         199|        Bronx|       Rikers Island|
|  2|           5|Staten Island|       Arden Heights|
|  3|         111|     Brooklyn| Green-Wood Cemetery|
|  3|           2|       Queens|         Jamaica Bay|
+---+------------+-------------+--------------------+

