# Week 5 Homework 

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHV 2019-10 data found here. [FHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz)

## Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

> [!NOTE]
> To install PySpark follow this [guide](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/05-batch/setup/pyspark.md)

### Notes:

:white_check_mark: '3.5.1'

In [1]:
import pyspark

from typing import TYPE_CHECKING

from pyspark.sql import SparkSession
from pyspark.sql.functions import max
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

'3.5.1'

## Question 2: 

**FHV October 2019**

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 1MB
- 6MB
- 25MB
- 87MB

### Notes:

:white_check_mark: 6MB

In [3]:
!wget -nc https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

File 'fhv_tripdata_2019-10.csv.gz' already there; not retrieving.



In [4]:
schema = StructType([
    StructField('dispatching_base_num', StringType()),
    StructField('pickup_datetime', TimestampType()),
    StructField('dropOff_datetime', TimestampType()),
    StructField('PUlocationID', ShortType()),
    StructField('DOlocationID', ShortType()),
    StructField('SR_Flag', FloatType()),
    StructField('Affiliated_base_number', StringType()),
])
(spark.read
    .csv(
        "./fhv_tripdata_2019-10.csv.gz",
        header=True,
        schema=schema,
    )
    .repartition(6)
    .write
    .parquet("./fhv_tripdata_2019-10/", mode="overwrite")
)

In [5]:
import statistics

from pathlib import Path


root_directory = Path("./fhv_tripdata_2019-10/")
pq_sizes_bytes: list[int] = [f.stat().st_size for f in root_directory.glob("*.parquet") if f.is_file()]

pq_mean_size_bytes: int = statistics.fmean(pq_sizes_bytes)
pq_mean_size_megabytes: int = pq_mean_size_bytes / (1024 * 1024)
print("Q2 Answer: The average size of parquet files is {:.2f} MB".format(pq_mean_size_megabytes))

Q2 Answer: The average size of parquet files is 6.36 MB


## Question 3: 

**Count records** 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 108,164
- 12,856
- 452,470
- 62,610

> [!IMPORTANT]
> Be aware of columns order when defining schema

### Notes:

:white_check_mark: 62,610

In [6]:
if TYPE_CHECKING:
    from pyspark.sql import DataFrame


fhv: "DataFrame" = spark.read.parquet("./fhv_tripdata_2019-10/")
fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02111|2019-10-27 05:35:21|2019-10-27 05:38:04|          92|          80|   NULL|                B02111|
|              B01437|2019-10-08 09:41:27|2019-10-08 09:46:52|         264|         197|   NULL|                B01437|
|              B02107|2019-10-09 16:53:55|2019-10-09 17:06:05|         264|         167|   NULL|                B02107|
|              B01653|2019-10-15 06:47:13|2019-10-15 06:59:36|         264|         264|   NULL|                B01653|
|              B00850|2019-10-13 03:57:31|2019-10-13 04:12:36|         264|         177|   NULL|                B00850|
|              B00756|2019-10-08 23:21:0

In [7]:
trip_count: int = fhv.where("cast(pickup_datetime as date) = '2019-10-15'").count()
print("Q3 Answer: There were {:,} taxi trips on the 15th of October".format(trip_count))

Q3 Answer: There were 62,610 taxi trips on the 15th of October


## Question 4: 

**Longest trip for each day** 

What is the length of the longest trip in the dataset in hours?

- 631,152.50 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

### Notes:

:white_check_mark: 631,152.50 Hours

In [8]:
if TYPE_CHECKING:
    from datetime import timedelta


td: "timedelta" = (fhv.select(max(fhv.dropOff_datetime - fhv.pickup_datetime))
    .first()[0]
)
longest_trip_in_hours: int = td.days * 24 + (td.seconds + td.microseconds * 1e-6) / (60 * 60)
print("Q4 Answer: The length of the longest trip in the dataset is {:,.2f} hours".format(longest_trip_in_hours))

Q4 Answer: The length of the longest trip in the dataset is 631,152.50 hours


## Question 5: 

**User Interface**

Spark’s User Interface which shows the application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080

### Notes:

:white_check_mark: 4040

In [9]:
display(display(spark.sparkContext))
spark.sparkContext.uiWebUrl

None

'http://host.docker.internal:4040'

## Question 6: 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?</br>

- East Chelsea
- Jamaica Bay
- Union Sq
- Crown Heights North

### Notes:

:white_check_mark: Jamaica Bay

In [10]:
!wget -nc https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

File 'taxi_zone_lookup.csv' already there; not retrieving.



In [11]:
schema = StructType([
    StructField('LocationID', ShortType()),
    StructField('Borough', StringType()),
    StructField('Zone', StringType()),
    StructField('service_zone', StringType()),
])
(spark.read
    .csv(
        "./taxi_zone_lookup.csv",
        header=True,
        schema=schema,
    )
    .createOrReplaceTempView("taxi_zones")
)
spark.sql("SELECT * FROM taxi_zones").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [12]:
(fhv.groupBy("PUlocationID")
    .count()
    .orderBy("count")
    .createOrReplaceTempView("pickup_loc_freq") 
)
spark.sql("SELECT * FROM pickup_loc_freq").show()

+------------+-----+
|PUlocationID|count|
+------------+-----+
|           2|    1|
|         105|    2|
|         111|    5|
|          30|    8|
|         120|   14|
|          12|   15|
|         207|   23|
|          27|   25|
|         154|   26|
|           8|   29|
|         128|   39|
|         253|   47|
|          96|   53|
|          34|   57|
|          59|   62|
|          58|   77|
|          99|   89|
|         190|   98|
|          54|  105|
|         217|  110|
+------------+-----+
only showing top 20 rows



In [13]:
if TYPE_CHECKING:
    from pyspark.sql import DataFrame


taxi_zones_extended: "DataFrame" = spark.sql("""
    SELECT *
    FROM taxi_zones
    JOIN pickup_loc_freq
        ON LocationID = PUlocationID
    ORDER BY count;
""")
taxi_zones_extended.show()

+----------+-------------+--------------------+------------+------------+-----+
|LocationID|      Borough|                Zone|service_zone|PUlocationID|count|
+----------+-------------+--------------------+------------+------------+-----+
|         2|       Queens|         Jamaica Bay|   Boro Zone|           2|    1|
|       105|    Manhattan|Governor's Island...| Yellow Zone|         105|    2|
|       111|     Brooklyn| Green-Wood Cemetery|   Boro Zone|         111|    5|
|        30|       Queens|       Broad Channel|   Boro Zone|          30|    8|
|       120|    Manhattan|     Highbridge Park|   Boro Zone|         120|   14|
|        12|    Manhattan|        Battery Park| Yellow Zone|          12|   15|
|       207|       Queens|Saint Michaels Ce...|   Boro Zone|         207|   23|
|        27|       Queens|Breezy Point/Fort...|   Boro Zone|          27|   25|
|       154|     Brooklyn|Marine Park/Floyd...|   Boro Zone|         154|   26|
|         8|       Queens|        Astori

In [14]:
least_freq_zone: str = taxi_zones_extended.first()["Zone"]
print("Q6 Answer: The name of the LEAST frequent pickup location Zone is {}".format(least_freq_zone))

Q6 Answer: The name of the LEAST frequent pickup location Zone is Jamaica Bay
