### Import Libraries

In [29]:
# Import libraries.
import os
import shutil
from datetime import datetime
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

### Start Spark Session

In [30]:
# Start spark session.
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

In [31]:
# Check Spark Version.
spark.version

'3.3.2'

### Repartition the June 2021 HVFHV Data into 12 partitions and save it to Parquet. What is the average size of the Parquet Files?

In [32]:
# Download the FHVHV 2021-06 data from here: (https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz).
if not os.path.exists('./data/raw/fhvhv_tripdata_2021-06.csv.gz'):
    os.makedirs('./data/raw')

    # Download data.
    !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz

    # Move data.
    shutil.move('./fhvhv_tripdata_2021-06.csv.gz', './data/raw/')

--2023-03-06 17:30:05--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e-a6da-4923-ad6f-35ff02446a51?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T173006Z&X-Amz-Expires=300&X-Amz-Signature=4df5978779e5d69b3cfe49dca5dd6b3b428c0a4a47212446d10599b9e5b44758&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhvhv_tripdata_2021-06.csv.gz&response-content-type=application%2Foctet-stream [following]
--2023-03-06 17:30:06--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/4564ad9e

In [33]:
# Read dataframe.
df_fhvhv = spark.read.option("header", "true").csv('./data/raw/fhvhv_tripdata_2021-06.csv.gz')

In [34]:
# Check schema.
df_fhvhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [35]:
# Check schema.
df_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [36]:
# Define schema.
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [37]:
# Read dataframe with schema defined.
df_fhvhv = spark.read.option("header", "true").schema(schema).csv('./data/raw/fhvhv_tripdata_2021-06.csv.gz')

In [38]:
# Check schema.
df_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [39]:
# Repartition.
df_fhvhv = df_fhvhv.repartition(12)

In [40]:
# Write partitions to parquet files.
df_fhvhv.write.parquet('./data/processed/fhvhv/2021/06/', mode='overwrite')

                                                                                

In [41]:
# Check average size of the parquet files.
!ls -lh  ./data/processed/fhvhv/2021/06

total 284M
-rw-r--r-- 1 famou famou   0 Mar  6 17:31 _SUCCESS
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00000-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00001-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00002-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00003-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00004-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00005-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00006-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00007-7253ab7b-882c-4ced-8ec0-aa4169e138d1-c000.snappy.parquet
-rw-r--r-- 1 famou famou 24M Mar  6 17:31 part-00008-7253a

### How many taxi trips were there on June 15?

In [42]:
# Read data.
df_fhvhv = spark.read.parquet('./data/processed/fhvhv/2021/06/')

In [43]:
# Check schema.
df_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [44]:
# View data.
df_fhvhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889|
|              B02800|2021-06-04 15:50:15|2021-06-04 16:19:29|          75|         116|      N|                  null|
|              B02510|2021-06-02 21:03:38|2021-06-02 21:10:12|         167|         168|      N|                  null|
|              B02867|2021-06-02 12:51:57|2021-06-02 13:05:09|         151|         142|      N|                B02867|
|              B02869|2021-06-21 09:51:45|2021-06-21 10:09:17|         106|          65|      N|                B02869|
|              B02764|2021-06-02 13:27:0

In [45]:
# Register temporary table.
df_fhvhv.registerTempTable('06_trips_data')



In [46]:
# Sql query to count the number of taxi trips on the 15th of June.
spark.sql("""
SELECT
    COUNT(*) AS taxi_trip_06_15
FROM
    06_trips_data AS t
WHERE 
    EXTRACT(DAY FROM t.pickup_datetime) = 15
""").show()

[Stage 6:>                                                          (0 + 4) / 4]

+---------------+
|taxi_trip_06_15|
+---------------+
|         452470|
+---------------+



                                                                                

### How long is the longest trip in the dataset? 

In [47]:
# Function to calculate the duration in hours.
def duration(pickup_datetime: datetime, dropoff_datetime: datetime):
    duration = dropoff_datetime - pickup_datetime
    duration_hour = duration.total_seconds() / 3600
    return duration_hour

In [48]:
# Test the function.
pickup_datetime = datetime.strptime('2021-06-25 15:20:01', '%Y-%m-%d %H:%M:%S')
dropoff_datetime = datetime.strptime('2021-06-25 17:00:00', '%Y-%m-%d %H:%M:%S')
duration(pickup_datetime, dropoff_datetime)

1.666388888888889

In [49]:
# User defined function.
duration_udf = F.udf(duration, returnType=types.FloatType())

In [50]:
# Check columns.
df_fhvhv.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number']

In [51]:
# Apply function to create a duration in hours column.
df_duration = df_fhvhv.withColumn('duration', duration_udf(df_fhvhv.pickup_datetime, df_fhvhv.dropoff_datetime))

In [52]:
# View data.
df_duration.show()

[Stage 9:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|   duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889|  0.3077778|
|              B02800|2021-06-04 15:50:15|2021-06-04 16:19:29|          75|         116|      N|                  null| 0.48722222|
|              B02510|2021-06-02 21:03:38|2021-06-02 21:10:12|         167|         168|      N|                  null| 0.10944445|
|              B02867|2021-06-02 12:51:57|2021-06-02 13:05:09|         151|         142|      N|                B02867|       0.22|
|              B02869|2021-06-21 09:51:45|2021-06-21 10:09:17|         106| 

                                                                                

In [53]:
# View schema
df_duration.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- duration: float (nullable = true)



In [54]:
# Register temporary table.
df_duration.registerTempTable('06_trips_data_duration')

In [55]:
# SQL query to check the trip duration (in hours) from the highest to the lowest.
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    duration AS duration_in_hours
FROM
    06_trips_data_duration AS t
ORDER BY 
    duration DESC
""").show()



+-------------------+-------------------+-----------------+
|    pickup_datetime|   dropoff_datetime|duration_in_hours|
+-------------------+-------------------+-----------------+
|2021-06-25 13:55:41|2021-06-28 08:48:25|         66.87889|
|2021-06-22 12:09:45|2021-06-23 13:42:44|        25.549723|
|2021-06-27 10:32:29|2021-06-28 06:31:20|        19.980833|
|2021-06-26 22:37:11|2021-06-27 16:49:01|        18.197222|
|2021-06-23 20:40:43|2021-06-24 13:08:44|        16.466944|
|2021-06-23 22:03:31|2021-06-24 12:19:39|        14.268888|
|2021-06-24 23:11:00|2021-06-25 13:05:35|        13.909722|
|2021-06-04 20:56:02|2021-06-05 08:36:14|            11.67|
|2021-06-27 07:45:19|2021-06-27 19:07:16|        11.365833|
|2021-06-20 17:05:12|2021-06-21 04:04:16|        10.984445|
|2021-06-01 12:25:29|2021-06-01 22:41:32|          10.2675|
|2021-06-01 12:01:46|2021-06-01 21:59:45|         9.966389|
|2021-06-28 13:13:59|2021-06-28 23:11:58|         9.966389|
|2021-06-27 03:52:14|2021-06-27 13:30:30

                                                                                

### What is the name of the most frequent pickup location zone?

In [57]:
if not os.path.exists('./data/raw/taxi+_zone_lookup.csv'):

    # Download data.
    !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

    # Move data.
    shutil.move('./taxi_zone_lookup.csv', './data/raw/')

--2023-03-06 17:34:20--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230306%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230306T173420Z&X-Amz-Expires=300&X-Amz-Signature=cc194d8dd03c64f2d07a63e53ea08a40c456766021629169c59b5bed7cc15218&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-06 17:34:20--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [58]:
# Read data.
df_zones = spark.read.option("header", "true").csv('./data/raw/taxi_zone_lookup.csv')

In [59]:
# View data.
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [60]:
# Join fhvhv and taxi zone lookup dataframes.
df_join = df_fhvhv.join(df_zones, df_fhvhv.PULocationID == df_zones.LocationID)

In [61]:
# View data.
df_join.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B02889|2021-06-04 20:51:44|2021-06-04 21:10:12|         239|         158|      N|                B02889|       239|Manhattan|Upper West Side S...| Yellow Zone|
|              B02800|2021-06-04 15:50:15|2021-06-04 16:19:29|          75|         116|      N|                  null|        75|Manhattan|   East Harlem South|   Boro Zone|
|              B02510|2021-06-02 21:03:38|2021-06-02 21:10:12|         167|         168|      N|                  null|      

In [62]:
# Register temporary table.
df_join.registerTempTable('06_trips_data')

In [63]:
# SQL query to check the most frequent pickup location zone.
spark.sql("""
SELECT
    zone,
    COUNT(*) AS total
FROM
    06_trips_data AS t
GROUP BY 
    zone
ORDER BY
    total DESC
""").show()

[Stage 16:>                                                         (0 + 4) / 4]

+--------------------+------+
|                zone| total|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



                                                                                

In [64]:
# Stop spark session.
spark.stop()