# Week 5 homework - Batch processing with Spark

## Import libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

## Create a local Spark session

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("dtc_hw") \
    .getOrCreate()

23/02/23 19:43:42 WARN Utils: Your hostname, GRAD0365UBUNTU resolves to a loopback address: 127.0.1.1; using 192.168.1.151 instead (on interface wlp0s20f3)
23/02/23 19:43:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/23 19:43:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 1
**Check Spark version**

In [3]:
spark.version

'3.3.1'

## Read the data and save it to Parquet

For this homework we will be using the FHVHV 2021-06 data found here: [FHVHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz).

Read it with Spark using the same schema as we did in the lessons.</br> 
Repartition it to 12 partitions and save it to parquet.</br>



In [4]:
# download file
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz -O ../data/fhvhv_tripdata_2021-06.csv.gz

--2023-02-23 19:43:45--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolviendo github.com (github.com)... 140.82.121.3
Conectando con github.com (github.com)[140.82.121.3]:443... conectado.
Petición HTTP enviada, esperando respuesta... 302 Found
Ubicación: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230223T184345Z&X-Amz-Expires=300&X-Amz-Signature=9c7f04268141e4157cdc95f7da763c8695e33ab0abf570f98a9793ae22c2fe65&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [siguiente]
--2023-02-23 19:43:45--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/51381494

In [5]:
# read the data and check schema
df = spark.read \
    .option("header", "true") \
    .csv("../data/fhvhv_tripdata_2021-06.csv.gz")
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [6]:
df.take(5)

[Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:02:41', dropoff_datetime='2021-06-01 00:07:46', PULocationID='174', DOLocationID='18', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:16:16', dropoff_datetime='2021-06-01 00:21:14', PULocationID='32', DOLocationID='254', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:27:01', dropoff_datetime='2021-06-01 00:42:11', PULocationID='240', DOLocationID='127', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime='2021-06-01 00:46:08', dropoff_datetime='2021-06-01 00:53:45', PULocationID='127', DOLocationID='235', SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02510', pickup_datetime='2021-06-01 00:45:42', dropoff_datetime='2021-06-01 01:03:33', PULocationID='144', DOLocationID='146', SR_Flag='N', Affiliated_base_number=Non

In [7]:
# create a schema for the dataframe
schema = types.StructType([
        types.StructField('dispatching_base_num', types.StringType(), True), 
        types.StructField('pickup_datetime', types.TimestampType(), True), 
        types.StructField('dropoff_datetime', types.TimestampType(), True), 
        types.StructField('PULocationID', types.IntegerType(), True), 
        types.StructField('DOLocationID', types.IntegerType(), True), 
        types.StructField('SR_Flag', types.StringType(), True),
        types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [8]:
# re-read the data using the schema we want
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("../data/fhvhv_tripdata_2021-06.csv.gz")

# repartition
df = df.repartition(12)

# save to parquet
df.write.parquet("../data/fhvhv/2021/06", mode="overwrite")

[Stage 2:>                                                          (0 + 1) / 1]

23/02/23 19:44:37 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers


                                                                                

### Question 2
**What is the average size of the Parquet (ending with .parquet extension) files that were created (in MB)?**

In [9]:
!ls -lh ../data/fhvhv/2021/06/*.parquet

-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00000-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00001-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00002-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00003-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00004-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv/2021/06/part-00005-8040fd6d-ba69-4dd1-9eb9-388e3477b1f0-c000.snappy.parquet
-rw-r--r-- 1 sgrodriguez usuarios del dominio 24M feb 23 19:44 ../data/fhvhv

As we can see from the previous command, all the Parquet files have a size of **24MB**.

### Question 3
**How many taxi trips were there on June 15? Consider only trips that started on June 15.**

In [10]:
df \
    .withColumn("pickup_date", F.to_date(df.pickup_datetime)) \
    .filter("pickup_date == '2021-06-15'") \
    .count()

                                                                                

452470

Let's solve it also via Spark SQL.

In [11]:
# register the dataframe as a table
df.createOrReplaceTempView("fhvhv_table")

In [12]:
spark.sql("""
SELECT COUNT(1) AS trips_count
FROM fhvhv_table
WHERE TO_DATE(pickup_datetime) = '2021-06-15';
""").show()

[Stage 11:>                                                         (0 + 1) / 1]

+-----------+
|trips_count|
+-----------+
|     452470|
+-----------+



                                                                                

### Question 4
**Longest trip for each day.**  
**Calculate the duration for each trip. How long was the longest trip in Hours?**

In [13]:
df \
    .withColumn("trip_duration", (df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long")) / 3600) \
    .withColumn("pickup_date", F.to_date(df.pickup_datetime)) \
    .select("pickup_date", "trip_duration") \
    .groupBy("pickup_date") \
    .agg(F.max("trip_duration").alias("longest_trip_duration")) \
    .orderBy("longest_trip_duration", ascending=False) \
    .limit(5) \
    .show()



+-----------+---------------------+
|pickup_date|longest_trip_duration|
+-----------+---------------------+
| 2021-06-25|     66.8788888888889|
| 2021-06-22|   25.549722222222222|
| 2021-06-27|   19.980833333333333|
| 2021-06-26|   18.197222222222223|
| 2021-06-23|   16.466944444444444|
+-----------+---------------------+



                                                                                

In [14]:
spark.sql("""
SELECT 
    TO_DATE(pickup_datetime) AS pickup_date,
    MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 3600) AS longest_trip_duration
FROM fhvhv_table
GROUP BY pickup_date
ORDER BY longest_trip_duration DESC
LIMIT 5;
""").show()



+-----------+---------------------+
|pickup_date|longest_trip_duration|
+-----------+---------------------+
| 2021-06-25|     66.8788888888889|
| 2021-06-22|   25.549722222222222|
| 2021-06-27|   19.980833333333333|
| 2021-06-26|   18.197222222222223|
| 2021-06-23|   16.466944444444444|
+-----------+---------------------+





Therefore, the longest trip lasted **66.88 hours**.

### Question 5
**User interface.**  
**Spark’s User Interface which shows application's dashboard runs on which local port?**  

Spark's UI runs at **`localhost:4040`**.

### Question 6

**Most frequent pickup location zone**  

Load the zone lookup data into a temp view in Spark: [Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)  

**What is the name of the most frequent pickup location zone?**

In [15]:
# download zone lookup data
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv -O ../data/taxi_zone_lookup.csv

# read data into a dataframe
df_zones = spark.read \
    .option("header", True) \
    .csv("../data/taxi_zone_lookup.csv")
df_zones.show(5)

--2023-02-23 19:46:23--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolviendo github.com (github.com)... 140.82.121.4
Conectando con github.com (github.com)[140.82.121.4]:443... conectado.
Petición HTTP enviada, esperando respuesta... 302 Found
Ubicación: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230223%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230223T184624Z&X-Amz-Expires=300&X-Amz-Signature=a1436d73d78f2f84c000780d65f835bf5b8470d48ac2294e49e4ff7beeda6737&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [siguiente]
--2023-02-23 19:46:24--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-458

In [16]:
df \
    .join(df_zones, on=df.PULocationID == df_zones.LocationID, how="left") \
    .groupBy("Zone") \
    .count() \
    .orderBy("count", ascending=False) \
    .limit(10) \
    .show()



+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
+--------------------+------+





In [17]:
df_zones.createOrReplaceTempView("zones")

In [18]:
spark.sql("""
SELECT 
    zones.Zone AS zone,
    COUNT(trips.PULocationID) AS number_trips
FROM fhvhv_table AS trips
LEFT JOIN zones
ON trips.PULocationID = zones.LocationID
GROUP BY zone
ORDER BY number_trips DESC
LIMIT 10;
""").show()



+--------------------+------------+
|                zone|number_trips|
+--------------------+------------+
| Crown Heights North|      231279|
|        East Village|      221244|
|         JFK Airport|      188867|
|      Bushwick South|      187929|
|       East New York|      186780|
|TriBeCa/Civic Center|      164344|
|   LaGuardia Airport|      161596|
|            Union Sq|      158937|
|        West Village|      154698|
|             Astoria|      152493|
+--------------------+------------+



                                                                                

As we can see, the most frequent pickup location zone is **Crown Heights North**.