In [2]:
from pyspark.sql import SparkSession
import requests
import os
import pandas as pd

In [3]:
#Setup

url_dwnld ='https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/'
file_name = "fhvhv_tripdata_2021-06.csv.gz"
# if the file already exists, it will be skipped
if not os.path.exists(f"./{file_name}"):
    response = requests.get(url_dwnld + file_name)
    open(f"./{file_name}", "wb").write(response.content)
    print(f"Downloaded {file_name}")
else:
    print(f"File {file_name} already exists")


File fhvhv_tripdata_2021-06.csv.gz already exists


#Q1


In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
# here spark is working locally so check http://localhost:4040

In [5]:
spark.version

'3.3.2'

#Q2

In [6]:
import gzip
import shutil
with gzip.open(f"./fhvhv_tripdata_2021-06.csv.gz", "rb") as f_in:
    with open(f"./fhvhv_tripdata_2021-06.csv", "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)

In [7]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-06.csv')

In [8]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [9]:
!powershell -command "Get-Content fhvhv_tripdata_2021-06.csv -TotalCount 1001 | Out-File -Encoding ASCII head.csv"


In [10]:
df_pandas = pd.read_csv('head.csv')

In [11]:
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [12]:
from pyspark.sql import types
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [14]:
df.head(5)

[Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 2, 41), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 7, 46), PULocationID=174, DOLocationID=18, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 16, 16), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 21, 14), PULocationID=32, DOLocationID=254, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 27, 1), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 42, 11), PULocationID=240, DOLocationID=127, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 6, 1, 0, 46, 8), dropoff_datetime=datetime.datetime(2021, 6, 1, 0, 53, 45), PULocationID=127, DOLocationID=235, SR_Flag='N', Affiliated_base_number='B02764'),
 Row(dispatching_base_num='B02510', pickup_datetime=datetime.d

In [15]:
df = df.repartition(12)

In [16]:
df.write.parquet('output/q2/', mode='overwrite')

#Q3

In [17]:
import datetime
datetime.date(2021, 6, 15)

datetime.date(2021, 6, 15)

In [18]:
df.select('dispatching_base_num', 'pickup_datetime') \
  .filter(df.pickup_datetime == datetime.date(2021, 6, 15)) \
  .show()

+--------------------+-------------------+
|dispatching_base_num|    pickup_datetime|
+--------------------+-------------------+
|              B02510|2021-06-15 00:00:00|
|              B02510|2021-06-15 00:00:00|
+--------------------+-------------------+



In [19]:
df.registerTempTable('trips_june21')
spark.sql("""
SELECT
    count(*)
FROM
    trips_june21
WHERE
    pickup_datetime = '2021-06-15'
""").show()



+--------+
|count(1)|
+--------+
|       2|
+--------+



In [20]:
from pyspark.sql import functions as F
df.withColumn('pickup_date', F.to_date(df.pickup_datetime)).filter("pickup_date = '2021-06-15'").count()

452470

#Q4

In [25]:
#METHOD 1
# make a new columns trip_duration = dropoff_datetime - pickup_datetime
df1 = df.withColumn('trip_duration', (F.col('dropoff_datetime').cast('long') - F.col('pickup_datetime').cast('long'))/3600)
# show the row with max trip_duration
df1.orderBy(F.col('trip_duration').desc()).show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|   trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|66.8788888888889|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------------+
only showing top 1 row



In [26]:
#METHOD 2: using UDF
def get_duration(row): # in hours
    return (row.dropoff_datetime - row.pickup_datetime).total_seconds() / 3600

from pyspark.sql import functions as F

get_duration_udf = F.udf(get_duration, returnType=types.FloatType())

df.withColumn('trip_duration', get_duration_udf(F.struct([df[x] for x in df.columns]))).orderBy(F.col('trip_duration').desc()).show(1)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
|              B02872|2021-06-25 13:55:41|2021-06-28 08:48:25|          98|         265|      N|                B02872|     66.87889|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+
only showing top 1 row



#Q5 -> 4040

#Q6

In [29]:
# load taxi zones
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')


In [30]:
df_zones.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [31]:
df.head(5)

[Row(dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 6, 1, 14, 6, 57), dropoff_datetime=datetime.datetime(2021, 6, 1, 14, 26, 39), PULocationID=132, DOLocationID=117, SR_Flag='N', Affiliated_base_number=None),
 Row(dispatching_base_num='B02869', pickup_datetime=datetime.datetime(2021, 6, 1, 16, 18, 58), dropoff_datetime=datetime.datetime(2021, 6, 1, 16, 41, 22), PULocationID=198, DOLocationID=196, SR_Flag='N', Affiliated_base_number='B02869'),
 Row(dispatching_base_num='B02887', pickup_datetime=datetime.datetime(2021, 6, 2, 17, 48, 58), dropoff_datetime=datetime.datetime(2021, 6, 2, 18, 5, 18), PULocationID=33, DOLocationID=144, SR_Flag='N', Affiliated_base_number='B02887'),
 Row(dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 6, 3, 16, 12, 3), dropoff_datetime=datetime.datetime(2021, 6, 3, 16, 25, 53), PULocationID=39, DOLocationID=72, SR_Flag='N', Affiliated_base_number=None),
 Row(dispatching_base_num='B02764', pickup_datetime=datetime.d

In [32]:
#get the most frequent PULocationID in df
df.groupBy('PULocationID').count().orderBy(F.col('count').desc()).show(1)

+------------+------+
|PULocationID| count|
+------------+------+
|          61|231279|
+------------+------+
only showing top 1 row



In [34]:
# get the Zone where LocationID = 61
df_zones.filter(df_zones.LocationID == 61).select('LocationID', 'Zone').show()

+----------+-------------------+
|LocationID|               Zone|
+----------+-------------------+
|        61|Crown Heights North|
+----------+-------------------+

