In [15]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [3]:
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/25 23:16:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.version

'3.4.2'

In [8]:
df_fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [12]:
year = 2019

for month in range(1, 13):
    print(f'Processing data for {year}/{month}...')
    input_path = f'data/raw/fhv/{year}/{month:02d}/'
    output_path = f'data/pq/fhv/{year}/{month:02d}/'
    
    df_fhv = spark.read \
        .option('header', 'true') \
        .parquet(input_path)

    df_fhv = df_fhv.select(
        df_fhv.dispatching_base_num.cast(types.StringType()),
        df_fhv.pickup_datetime.cast(types.TimestampType()),
        df_fhv.dropOff_datetime.cast(types.TimestampType()),
        df_fhv.PUlocationID.cast(types.IntegerType()),
        df_fhv.DOlocationID.cast(types.IntegerType()),
        df_fhv.SR_Flag.cast(types.StringType()),
        df_fhv.Affiliated_base_number.cast(types.StringType()),
    )
    
    df_fhv.repartition(6) \
        .write.parquet(output_path)

Processing data for 2019/1...


                                                                                

Processing data for 2019/2...


                                                                                

Processing data for 2019/3...


                                                                                

Processing data for 2019/4...


                                                                                

Processing data for 2019/5...


                                                                                

Processing data for 2019/6...


                                                                                

Processing data for 2019/7...


                                                                                

Processing data for 2019/8...


                                                                                

Processing data for 2019/9...


                                                                                

Processing data for 2019/10...


                                                                                

Processing data for 2019/11...


                                                                                

Processing data for 2019/12...


                                                                                

In [16]:
df = spark \
    .read \
    .parquet('data/pq/fhv/2019/10')

In [17]:
df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime))

In [20]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)



In [23]:
# Q3: Number of taxi trips that were there on the 15th of October
df \
    .filter(df.pickup_date == '2019-10-15')\
    .count()

62629

In [24]:
df.registerTempTable('fhv')



In [51]:
# Q4: Longest trip of each day in hours
df_fhv_daily_longest_trip_distance = spark.sql("""
select
    pickup_date,
    pickup_datetime, 
    dropOff_datetime, 
    datediff(second, pickup_datetime, dropOff_datetime)/ 3600.0 as trip_duration_hour
from
    fhv
order by
    4 desc
""")

In [52]:
df_fhv_daily_longest_trip_distance.head(1)

                                                                                

[Row(pickup_date=datetime.date(2019, 10, 28), pickup_datetime=datetime.datetime(2019, 10, 28, 9, 0), dropOff_datetime=datetime.datetime(2091, 10, 28, 9, 30), trip_duration_hour=Decimal('631152.500000'))]

In [53]:
df_zones = spark.read.parquet('zones/')
df_zones.registerTempTable('zones')

In [54]:
df_fhv_trip_count_by_zone = spark.sql("""
select
    zone,
    count(*) as trip_count
from
    fhv f
left join
    zones z
on  
    f.PULocationID = z.LocationID
group by
    1
order by
    2
""")

In [58]:
# Q6: Name of the LEAST frequent pickup location Zone
df_fhv_trip_count_by_zone.take(5)

[Row(zone='Jamaica Bay', trip_count=1),
 Row(zone="Governor's Island/Ellis Island/Liberty Island", trip_count=2),
 Row(zone='Green-Wood Cemetery', trip_count=5),
 Row(zone='Broad Channel', trip_count=8),
 Row(zone='Highbridge Park', trip_count=14)]