In [44]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/07 12:37:38 WARN Utils: Your hostname, MacBook-Pro-de-Rafael.local resolves to a loopback address: 127.0.0.1; using 10.1.10.144 instead (on interface en0)
24/03/07 12:37:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/07 12:37:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Question 1
Pyspark version

In [3]:
spark.version

'3.5.1'

24/03/07 12:37:50 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Question 2
Load data and average file size

In [5]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-07 13:30:12--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.3
Connecting to github.com (github.com)|140.82.121.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240307%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240307T123012Z&X-Amz-Expires=300&X-Amz-Signature=b336d7b04906c07c25d0da032e4e8a8d47b1d9de30f64d324dbdd6c187eb66dd&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-07 13:30:12--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [7]:
!gzip -d fhv_tripdata_2019-10.csv.gz

In [9]:
!wc -l fhv_tripdata_2019-10.csv

 1897494 fhv_tripdata_2019-10.csv


In [14]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('fhv_tripdata_2019-10.csv')

                                                                                

In [15]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [41]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [19]:
df = df.repartition(6)

In [20]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [22]:
# File sizes in bash:
!wc -c fhv/2019/10/*.parquet

 6298655 fhv/2019/10/part-00000-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 6290542 fhv/2019/10/part-00001-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 6298835 fhv/2019/10/part-00002-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 6292819 fhv/2019/10/part-00003-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 6288895 fhv/2019/10/part-00004-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 6305788 fhv/2019/10/part-00005-b621dbc5-8462-47ec-9b8d-4ab0a16a6ed5-c000.snappy.parquet
 37775534 total


In [24]:
# Number of files in bash
!ls fhv/2019/10/*.parquet | wc -l

       6


In [31]:
# Divide total between number of files to get average file size
37775534/6/1024/1024 # Divide two times by 1024 to get MB

6.004259427388509

In [40]:
# Same but in python
import glob
import os
path = 'fhv/2019/10/*.parquet'
files = glob.glob(path)

average_file_size = sum([os.stat(i).st_size for i in files])/1024/1024 / len(files)

print(f'The average file size of the .parquet files is {average_file_size:.2f} MB')

The average file size of the .parquet files is 6.00 MB


## Question 3
Count trips starting the 15th of october 2019

There are two ways to do this: via sql or directly in pyspark

### Option 1: Directly in pyspark

In [48]:
# Pickup datetime is datetime, so transform to date
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

### Option 2: SQL in Pyspark

In [49]:
df.registerTempTable('trips_data')



In [50]:
spark.sql("""
    SELECT
        count(1)
    FROM
        trips_data
    WHERE
        DATE(pickup_datetime) = '2019-10-15'
    """).show()


+--------+
|count(1)|
+--------+
|   62610|
+--------+



## Question 4
Length of the longest trip in hours

### Option 1: Directly in Pyspark

In [69]:
df \
    .withColumn('duration', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600) \
    .select('duration') \
    .orderBy('duration', ascending=False) \
    .limit(1) \
    .show()



+--------+
|duration|
+--------+
|631152.5|
+--------+



                                                                                

### Option 2: SQL in Pyspark

In [64]:
spark.sql("""
    SELECT
        TIMESTAMPDIFF(HOUR, pickup_datetime, dropOff_datetime)
    FROM
        trips_data
    ORDER BY
        1 DESC
    LIMIT 1
    """).show()



+------------------------------------------------------+
|timestampdiff(HOUR, pickup_datetime, dropOff_datetime)|
+------------------------------------------------------+
|                                                631152|
+------------------------------------------------------+



                                                                                

## Question 5

Spark’s User Interface which shows the application's dashboard runs on the local port 4040



## Question 6

In [70]:
df_zones = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('../code/taxi+_zone_lookup.csv')

In [71]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [73]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [74]:
df_zones.registerTempTable('zones')



In [77]:
spark.sql("""
    SELECT
        z.Zone,
        count(1)
    FROM
        trips_data t
    JOIN
        zones z on z.LocationID = t.PUlocationID
    GROUP BY
        1
    ORDER BY
        2 ASC
    LIMIT 1
    """).show()

+-----------+--------+
|       Zone|count(1)|
+-----------+--------+
|Jamaica Bay|       1|
+-----------+--------+

