## Week 5 Homework 

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHV 2019-10 data found here. [FHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz)


### Question 1: 

**Install Spark and PySpark** 

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

What's the output?

> [!NOTE]
> To install PySpark follow this [guide](https://github.com/DataTalksClub/data-engineering-zoomcamp/blob/main/05-batch/setup/pyspark.md)

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Q1 - Answer 

In [2]:
#pyspark.__version__
spark.version

'3.5.1'

### Question 2: 

**FHV October 2019**

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

- 1MB
- 6MB
- 25MB
- 87MB

In [3]:
year = 2019
month = 10
type = 'fhv'

In [4]:
!sh download_data.sh {type} {year} {month}

downloading https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz to data/raw/fhv/2019/10/fhv_tripdata_2019_10.csv.gz
--2024-03-05 17:27:22--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240305T162722Z&X-Amz-Expires=300&X-Amz-Signature=e832ce0929c351ef5c201ddc7e220eef8f13446b63e946efa5268b9b48933638&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=appli

In [None]:
import pandas as pd

df_fhv_pd = pd.read_csv(f'data/raw/{type}/{year}/{month:02d}/{type}_tripdata_{year}_{month:02d}.csv.gz', nrows=100)

schema_ls = spark.createDataFrame(df_fhv_pd).schema
f = open('_fhv_schema_def_', 'w')
f.write(str(schema_ls))

In [6]:
from pyspark.sql import types

fhv_schema = types.StructType([
     types.StructField('dispatching_base_num', types.StringType(), True)
    ,types.StructField('pickup_datetime', types.TimestampType(), True)
    ,types.StructField('dropoff_datetime', types.TimestampType(), True)
    ,types.StructField('PULocationID', types.IntegerType(), True)
    ,types.StructField('DOLocationID', types.IntegerType(), True)
    ,types.StructField('SR_Flag', types.DoubleType(), True)
    ,types.StructField('Affiliated_base_number', types.StringType(), True)
])

in_fhv_path = f'data/raw/{type}/{year}/{month:02d}/'


df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv(in_fhv_path)

df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: double (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [7]:
out_fhv_path = f'data/parquet/{type}/{year}/{month:02d}/'

df_fhv \
    .repartition(6) \
    .write \
    .mode('overwrite') \
    .parquet(out_fhv_path)

                                                                                

# Q2 - Answer

In [8]:
!ls -lh {out_fhv_path} | grep '.parquet' 

-rw-r--r--  1 peter  staff   6.4M Mar  5 17:27 part-00000-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet
-rw-r--r--  1 peter  staff   6.3M Mar  5 17:27 part-00001-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet
-rw-r--r--  1 peter  staff   6.4M Mar  5 17:27 part-00002-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet
-rw-r--r--  1 peter  staff   6.4M Mar  5 17:27 part-00003-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet
-rw-r--r--  1 peter  staff   6.4M Mar  5 17:27 part-00004-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet
-rw-r--r--  1 peter  staff   6.4M Mar  5 17:27 part-00005-f4616939-261a-4ae4-875e-8ca011547c8f-c000.snappy.parquet


#
### Question 3: 

**Count records** 

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

- 108,164
- 12,856
- 452,470
- 62,610

> [!IMPORTANT]
> Be aware of columns order when defining schema

In [None]:
fhv_2019_10 = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .parquet(out_fhv_path)

# temp table for SQL queries
fhv_2019_10.registerTempTable('fhv_2019_10')

# Q3 - Answer

In [10]:
spark.sql("""
select
    '2019-10-15'  as trip_date
    ,count(1)     as trip_total
  from fhv_2019_10
 where cast(pickup_datetime as date) = date('2019-10-15')
;
""").show()

+----------+----------+
| trip_date|trip_total|
+----------+----------+
|2019-10-15|     62610|
+----------+----------+



#
### Question 4: 

**Longest trip for each day** 

What is the length of the longest trip in the dataset in hours?

- 631,152.50 Hours
- 243.44 Hours
- 7.68 Hours
- 3.32 Hours

# Q4 - Answer

In [11]:
spark.sql("""
with trip_duration as (
select
     datepart('day', pickup_datetime) as trip_day
    ,max(
      round(
        timestampdiff(minute,
          pickup_datetime,
          dropoff_datetime)
        / 60, 2))                     as duration_hours
  from fhv_2019_10
 group by trip_day
)

select max(duration_hours) as longest_trip_hrs
  from trip_duration
;
""").show()


+----------------+
|longest_trip_hrs|
+----------------+
|        631152.5|
+----------------+



#
### Question 5: 

**User Interface**

Spark’s User Interface which shows the application's dashboard runs on which local port?

- 80
- 443
- 4040
- 8080

# Q5 - Answer

In [12]:
spark.sparkContext.uiWebUrl
# port extraction .split(':')[-1]

'http://spcr:4040'

#
### Question 6: 

**Least frequent pickup location zone**

Load the zone lookup data into a temp view in Spark</br>
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?</br>

- East Chelsea
- Jamaica Bay
- Union Sq
- Crown Heights North

In [55]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-05 17:50:47--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240305T165047Z&X-Amz-Expires=300&X-Amz-Signature=add6a5497cccb8f4552ce6e90b63f740ee18c44b827da3f5c48b1a986afbc98d&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-05 17:50:47--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [21]:
df_lkp_zone = pd.read_csv('taxi_zone_lookup.csv', nrows=100)
df_lkp_zone.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [None]:
schema_lkpz = spark.createDataFrame(df_lkp_zone).schema
f = open('_lkpz_schema_def_', 'w')
f.write(str(schema_lkpz))

In [None]:
lkpz_schema = types.StructType([
     types.StructField('LocationID', types.IntegerType(), True)
    ,types.StructField('Borough', types.StringType(), True)
    ,types.StructField('Zone', types.StringType(), True)
    ,types.StructField('service_zone', types.StringType(), True)
])

df_lkpz = spark.read \
    .option("header", "true") \
    .schema(lkpz_schema) \
    .csv('taxi_zone_lookup.csv')

df_lkpz.registerTempTable('df_lkpz')

# Q6 - Answer

In [54]:
import datetime as D

t2 = D.datetime.now()
spark.sql("""
with pu_total as (
select
     PULocationID
    ,count(*) as pickup_total
  from fhv_2019_10 a
 group by PULocationID
)


select Zone, min(pickup_total) as PickUp
  from pu_total a
  left join df_lkpz b
    on a.PULocationID = b.LocationID
 group by Zone
 order by 2
 limit 1
;
""").show()
t2 = D.datetime.now()-t2

+-----------+------+
|       Zone|PickUp|
+-----------+------+
|Jamaica Bay|     1|
+-----------+------+

