In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
# Creating local spark session:

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('homework') \
    .getOrCreate()

In [4]:
# Q-1 Spark version:
spark.version

'3.5.0'

In [9]:
# Downloading the fhv-2019 data:

!wget -O - https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz | gunzip > fhv_tripdata_2019-10.csv

--2024-02-25 14:56:18--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240225%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240225T145618Z&X-Amz-Expires=300&X-Amz-Signature=09a306489e8b4fa428fa977b823de9a8f6a33aaef25474df59a6e748e92d989e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-02-25 14:56:18--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [5]:
!ls -l fhv_tripdata_2019-10.csv

-rw-rw-r-- 1 Kenan Kenan 119796110 Dec  2  2022 fhv_tripdata_2019-10.csv


In [28]:
df_csv = pd.read_csv('fhv_tripdata_2019-10.csv')

df_csv.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [36]:
df_csv.head(10)

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014
5,B00021,2019-10-01 00:00:48,2019-10-01 00:07:12,129.0,129.0,,B00021
6,B00021,2019-10-01 00:47:23,2019-10-01 00:53:25,57.0,57.0,,B00021
7,B00021,2019-10-01 00:10:06,2019-10-01 00:19:50,173.0,173.0,,B00021
8,B00021,2019-10-01 00:51:37,2019-10-01 01:06:14,226.0,226.0,,B00021
9,B00021,2019-10-01 00:28:23,2019-10-01 00:34:33,56.0,56.0,,B00021


In [37]:
# Defining data schema:

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.FloatType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [38]:
# Reading data with the schema:

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("fhv_tripdata_2019-10.csv")

In [39]:
df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [40]:
# Partitioning data to 6 partition and saving as parquet format:

df = df.repartition(6)

df.write.parquet('data-hw/fhv/2019/10/', mode='overwrite')

In [12]:
!head -n 1001 fhv_tripdata_2019-10.csv > head_2019.csv

In [14]:
!du -sh data-hw/fhv/2019/10/

36M	data-hw/fhv/2019/10/


In [15]:
# Q-3: How many taxi trips were there on the 15th of October?

In [41]:
df = spark.read.parquet("data-hw/fhv/2019/10/")

In [42]:
from pyspark.sql import functions as F

In [43]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

62610

In [44]:
# Q-4 What is the length of the longest trip in the dataset in hours?

In [47]:
df.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   NULL|                  NULL|
|              B01233|2019-10-08 00:09:01|2019-10-08 00:11:14|         264|          81|   NULL|                B01233|
|              B02688|2019-10-02 10:59:00|2019-10-02 11:34:46|         264|         107|   NULL|                B02688|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 3 rows



In [50]:
df \
    .withColumn('duration', (F.col('dropOff_datetime').cast('long') - F.col('pickup_datetime').cast('long')) / 3600) \
    .withColumn('pickup_date', F.to_date('pickup_datetime')) \
    .groupBy('pickup_date') \
    .agg(F.max('duration').alias('max_duration_hours')) \
    .orderBy('max_duration_hours', ascending=False) \
    .limit(5) \
    .show()

+-----------+------------------+
|pickup_date|max_duration_hours|
+-----------+------------------+
| 2019-10-28|          631152.5|
| 2019-10-11|          631152.5|
| 2019-10-31| 87672.44083333333|
| 2019-10-01| 70128.02805555555|
| 2019-10-17|            8794.0|
+-----------+------------------+



In [51]:
# Q-6 Least frequent pickup location zone

In [52]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-02-25 21:36:12--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240225%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240225T213612Z&X-Amz-Expires=300&X-Amz-Signature=6c820c3cee6213b13e1d98121faecb57995c37d5f9a6194d33164b6225925539&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-02-25 21:36:12--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [54]:
zones = pd.read_csv('taxi_zone_lookup.csv')

zones.dtypes

LocationID       int64
Borough         object
Zone            object
service_zone    object
dtype: object

In [55]:
# Defining Zones data schema:

schema_zone = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])


df_zones = spark.read \
            .option("header", "true") \
            .schema(schema_zone) \
            .csv('taxi_zone_lookup.csv')

In [56]:
df_zones.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [60]:
df.show(3)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   NULL|                  NULL|
|              B01233|2019-10-08 00:09:01|2019-10-08 00:11:14|         264|          81|   NULL|                B01233|
|              B02688|2019-10-02 10:59:00|2019-10-02 11:34:46|         264|         107|   NULL|                B02688|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 3 rows



In [59]:
df.createOrReplaceTempView('fhv_2019')
df_zones.createOrReplaceTempView('zones')

In [62]:
spark.sql("""
SELECT
    pul.Zone AS least,
    COUNT(1)
FROM 
    fhv_2019 fhv LEFT JOIN zones pul ON fhv.PUlocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()

+--------------------+--------+
|               least|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+

