# Week 5: Batch Processing with Spark (Homework)

The datasets are downloaded from the following links:
- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/23 06:12:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Q1. spark.version

In [3]:
spark.version

'3.3.2'

### Q2. What is the average size of the Parquet Files that were created (in MB)?

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-06.csv')

In [6]:
df = df.repartition(12)

df.write.parquet('data/homework/fhvhv/2021/06/', mode="overwrite")

[Stage 2:>                                                         (0 + 8) / 12]

23/02/23 06:12:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [8]:
df = spark.read.parquet('data/homework/fhvhv/2021/06/')

In [9]:
!ls -lh data/homework/fhvhv/2021/06/

total 270M
-rw-r--r-- 1 aditya aditya   0 Feb 23 06:12 _SUCCESS
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00000-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00001-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00002-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00003-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00004-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00005-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00006-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M Feb 23 06:12 part-00007-b5917c75-ab1d-4e58-a921-40db2493dd14-c000.snappy.parquet
-rw-r--r-- 1 aditya aditya 23M

### **Q3**: How many taxi trips were there on February 15?

In [10]:
from pyspark.sql import functions as F

In [11]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2021-06-15'") \
    .count()

                                                                                

452470

### Q4. Calculate the duration for each trip. How long was the longest trip in Hours?

In [18]:
df = df \
    .withColumn('trip_duration', F.round((F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 3600, 2)) \
    .orderBy(F.col('trip_duration').desc())

In [19]:
df.show(10)



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|        66.88|
|              B02765|2021-06-22 12:09:45|2021-06-23 13:42:44|         188|         198|      N|                B02765|        25.55|
|              B02879|2021-06-27 10:32:29|2021-06-28 06:31:20|          78|         169|      N|                B02879|        19.98|
|              B02800|2021-06-26 22:37:11|2021-06-27 16:49:01|         263|          36|      N|                  null|         18.2|
|              B02682|2021-06-23 20:40:43|2021-06-24 13:08:44|

                                                                                

### Q5. Spark’s User Interface which shows application's dashboard runs on which local port?

**Ans:** 4040

### Q6. Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [20]:
lookup_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [21]:
lookup = spark.read \
         .option('header', True) \
         .schema(lookup_schema) \
         .csv('data/taxi_zone_lookup.csv')

lookup.take(5)

[Row(LocationID=1, Borough='EWR', zone='Newark Airport', service_zone='EWR'),
 Row(LocationID=2, Borough='Queens', zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID=3, Borough='Bronx', zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID=4, Borough='Manhattan', zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID=5, Borough='Staten Island', zone='Arden Heights', service_zone='Boro Zone')]

In [22]:
# register a temporary view
lookup.createOrReplaceTempView('lookup')
df.createOrReplaceTempView('fhv')

In [27]:
num_of_trips_per_pickup_location_id = spark.sql("""
WITH cte AS 
(
SELECT PULocationID, COUNT(1) AS num_of_trips
FROM fhv
GROUP BY PULocationID
ORDER BY COUNT(1) DESC
)
SELECT l.zone, cte.num_of_trips
FROM cte LEFT JOIN lookup l ON cte.PULocationID = l.LocationID
""")

In [28]:
num_of_trips_per_pickup_location_id.take(10)

[Row(zone='Crown Heights North', num_of_trips=231279),
 Row(zone='East Village', num_of_trips=221244),
 Row(zone='JFK Airport', num_of_trips=188867),
 Row(zone='Bushwick South', num_of_trips=187929),
 Row(zone='East New York', num_of_trips=186780),
 Row(zone='TriBeCa/Civic Center', num_of_trips=164344),
 Row(zone='LaGuardia Airport', num_of_trips=161596),
 Row(zone='Union Sq', num_of_trips=158937),
 Row(zone='West Village', num_of_trips=154698),
 Row(zone='Astoria', num_of_trips=152493)]