# <center> Week 5 Homework </center>
---

## Setup Homework

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHV 2019-10 data found here. FHV Data

---
---

In [1]:
import os
import glob
import shutil
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [2]:
# Compile homework questions container
questions = {
    1: "What's the spark version?",
    2: "What is the average size of the parquet (ending with .parquet extension) files that were created (in MB)?",
    3: "How many taxi trips were there on the 15th of October? NOTE: Consider only trips that started on the 15th of October.",
    4: "What is the length of the longest trip in the dataset in hours?",
    5: "Spark’s User Interface which shows the application's dashboard runs on which local port?",
    6: "Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location zone?",
}

# Compile the homework solutions container
solutions = {}

In [3]:
# Set the path to the raw data and the output directory
url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz'
file_name = 'fhv_tripdata_2019-10.csv.gz'
input_path = '../data/raw/fhv/2019/10/' + file_name
output_path = '../data/pq/fhv/2019/10/'

# If the data is not present, download it
if os.path.exists(input_path):
    # Log the event
    print('Recreating the data folders and downloading the data')

    # Delete the data folders
    shutil.rmtree("../data", ignore_errors=True)

    # Re-create the data folders
    os.makedirs(input_path, exist_ok=True)

    # Download the data
    os.system(f'wget -q {url}')

    # Move the data to the correct folder
    os.system(f"mv {file_name} {input_path}")

else:
    # Log the event
    print('FHV file already exists')

Recreating the data folders and downloading the data


---
---

## Questions
---

#### Question 1: Install `Spark` and `PySpark`

Install Spark and PySpark using these [guides](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/05-batch/setup/pyspark.md)

> What is the spark version being used?

In [4]:
# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework-05') \
    .getOrCreate()

# Display spark UI
print(f"\nSpark UI running on http://localhost:{spark.sparkContext.uiWebUrl.split(':')[-1]}")

# Record the current spark version
solutions[1] = f'spark version: {spark.version}'

# Display the spark version
print()
print(f'Question 1:\t{questions[1]}')
print(f'Solution 1:\t{solutions[1]}')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 07:54:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Spark UI running on http://localhost:4040

Question 1:	What's the spark version?
Solution 1:	spark version: 3.3.2


---

#### Question 2

Steps:
1. Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.
2. Repartition the Dataframe to 6 partitions and save it to parquet.

> What is the average size of the parquet (ending with .parquet extension) files that were created (in MB)?

In [5]:
# Define the schema for the FHV data
fhv_schema = T.StructType([
    T.StructField('dispatching_base_num', T.StringType(), True),
    T.StructField('pickup_datetime', T.TimestampType(), True),
    T.StructField('dropoff_datetime', T.TimestampType(), True),
    T.StructField('PULocationID', T.IntegerType(), True),
    T.StructField('DOLocationID', T.IntegerType(), True),
    T.StructField('SR_Flag', T.StringType(), True),
    T.StructField('Affiliated_base_number', T.StringType(), True),
])

# Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.
df = spark.read \
    .option('header', 'true') \
    .schema(fhv_schema) \
    .csv(input_path)

# Display the dataframe
df.show()

                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [6]:

# Repartition the Dataframe to 6 partitions and save it to parquet.
df \
    .repartition(6) \
    .write.parquet(output_path)

# Get the file sizes in megabytes
def format_filesize(size: float) -> str:
    """Formats a filesize in bytes to a human-readable string.

    Args:
        size: The size of the file in bytes.

    Returns:
        A human-readable string representing the size of the file.
    """
    suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
    index = 0

    while size >= 1024 and index < len(suffixes) - 1:
        size /= 1024
        index += 1

    return f"{size:.1f} {suffixes[index]}"

# Get the size of the parquet files
parquet_files = glob.glob(output_path + '/*.parquet')
parquet_sizes = [*map(os.path.getsize, parquet_files)]
avg_parquet_size = round(sum(parquet_sizes) / len(parquet_files))

# Record the average size of the parquet files
solutions[2] = format_filesize(avg_parquet_size)

# Display the solution
print()
print('Question 2:', questions[2])
print('Solution 2:', solutions[2])




Question 2: What is the average size of the parquet (ending with .parquet extension) files that were created (in MB)?
Solution 2: 6.4 MB


                                                                                

---

#### Question 3: Count records

> How many taxi trips were there on the 15th of October?
>
> NOTE: Consider only trips that started on the 15th of October.

In [7]:
# Filter the data by the pickup date
pickup_date = '2019-10-15'
filtered_df = df.filter(F.col('pickup_datetime').cast('date') == pickup_date)
record_count = filtered_df.count()

# Record the number of trips on the 15th of October
solutions[3] = '{:,} trips'.format(record_count)

# Display the solution
print()
print(f'Question 3:\t{questions[3]}')
print(f'Solution 3:\t{solutions[3]}')

[Stage 4:>                                                          (0 + 1) / 1]


Question 3:	How many taxi trips were there on the 15th of October? NOTE: Consider only trips that started on the 15th of October.
Solution 3:	62,610 trips


                                                                                

---

#### Question 4: Longest trip for each day

> What is the length of the longest trip in the dataset in hours?

In [8]:
# Calculate trip duration in seconds
df = df.withColumn("duration_seconds", F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime"))

# Convert duration to hours and get the trip with the longest duration
longest_trip = df.select("duration_seconds").orderBy(F.col("duration_seconds").desc()).first()[0] / 3600

# Optionally, join this result with the original DataFrame to get details of the longest trip
# longest_trip_details = df.join(longest_trip, expr("df.duration_seconds == longest_trip.duration_seconds"))

# Record the length of the longest trip in the dataset
solutions[4] = '{:,} hours'.format(longest_trip)

# Display the solution
print()
print(f'Question 4:\t{questions[4]}')
print(f'Solution 4:\t{solutions[4]}')

[Stage 7:>                                                          (0 + 1) / 1]


Question 4:	What is the length of the longest trip in the dataset in hours?
Solution 4:	631,152.5 hours


                                                                                

---

#### Question 5: Spark UI

> Spark’s User Interface shows the application's dashboard runs on which local port?

In [9]:
# Spark UI port
solutions[5] = spark.sparkContext.uiWebUrl.split(':')[-1]

# Display the solution
print()
print(f'Question 5:\t{questions[5]}')
print(f'Solution 5:\t{solutions[5]}')


Question 5:	Spark’s User Interface which shows the application's dashboard runs on which local port?
Solution 5:	4040


---

#### Question 6: Least frequent pickup location zone

Load the fhv data and [taxi zone lookup data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv) into a temp view in Spark

> Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location zone?

In [10]:
# Set the taxi zone lookup data arguments
zone_url = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv'
zone_filename = 'taxi_zone_lookup.csv'
zone_filepath = '../data/' + zone_filename

# If the data is not present, download it
if not os.path.exists(zone_filepath):
    # Log the event
    print('Downloading the taxi zone lookup data')

    # Download the data
    os.system(f'wget -q {zone_url}')

    # Move the data to the correct folder
    os.system(f"mv {zone_filename} {zone_filepath}")

else:
    # Log the event
    print('Taxi zone lookup data already exists')

# Load the taxi zone lookup data
zone_df = spark.read.option('header', 'true').csv(zone_filepath)

# Create a temporary view for the zone data 
zone_df.createOrReplaceTempView('zones')
df.createOrReplaceTempView('fhv')

# Join the FHV data with the zone lookup data
new_df = spark.sql("""
    SELECT
        dispatching_base_num,
        pickup_datetime,
        dropoff_datetime,
        pu.Zone as pickup_zone,
        pu.Borough as pickup_borough,
        do.Zone as dropoff_zone,
        do.Borough as dropoff_borough,
        SR_Flag,
        Affiliated_base_number
    FROM
        fhv
    LEFT JOIN
        zones pu ON fhv.PULocationID = pu.LocationID
    LEFT JOIN
        zones do ON fhv.DOLocationID = do.LocationID
""")

# Group by the pickup zone and count the number of trips
pickup_zone_counts = new_df.groupBy('pickup_zone').count()

# Order the counts in ascending order and get the first row
least_frequent_zone = pickup_zone_counts.orderBy(F.col('count').asc()).first()[0]

# Record the name of the least frequent pickup location zone
solutions[6] = least_frequent_zone

# Display the solution
print()
print(f'Question 6:\t{questions[6]}')
print(f'Solution 6:\t{solutions[6]}')

Downloading the taxi zone lookup data


[Stage 11:>                                                         (0 + 1) / 1]


Question 6:	Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location zone?
Solution 6:	Jamaica Bay


                                                                                

---

## Answers

In [11]:
for idx in questions:
    print()
    print(f'Question {idx}:\t{questions[idx]}')
    print(f'Solution {idx}:\t{solutions[idx]}')
    print()


Question 1:	What's the spark version?
Solution 1:	spark version: 3.3.2


Question 2:	What is the average size of the parquet (ending with .parquet extension) files that were created (in MB)?
Solution 2:	6.4 MB


Question 3:	How many taxi trips were there on the 15th of October? NOTE: Consider only trips that started on the 15th of October.
Solution 3:	62,610 trips


Question 4:	What is the length of the longest trip in the dataset in hours?
Solution 4:	631,152.5 hours


Question 5:	Spark’s User Interface which shows the application's dashboard runs on which local port?
Solution 5:	4040


Question 6:	Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location zone?
Solution 6:	Jamaica Bay



---
---
---