# 0. Preparations

In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('ha5') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/06 22:54:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-06 22:54:10--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 143.204.102.43, 143.204.102.231, 143.204.102.120, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|143.204.102.43|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-06 22:54:12 (49.1 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



# 1. Questions

## 1.1 Spark version

In [5]:
spark.version

'3.3.2'

## 1.2 Partitions Yellow October 2024

In [6]:
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [7]:
df = df.repartition(4)

In [8]:
df.write.parquet('yellow/2024/10/')

                                                                                

In [9]:
!ls -lh yellow/2024/10/

total 97M
-rw-r--r-- 1 m1sk0 m1sk0   0 Mar  6 22:54 _SUCCESS
-rw-r--r-- 1 m1sk0 m1sk0 25M Mar  6 22:54 part-00000-75625f18-74be-4097-8fbb-54fc2b665a47-c000.snappy.parquet
-rw-r--r-- 1 m1sk0 m1sk0 25M Mar  6 22:54 part-00001-75625f18-74be-4097-8fbb-54fc2b665a47-c000.snappy.parquet
-rw-r--r-- 1 m1sk0 m1sk0 25M Mar  6 22:54 part-00002-75625f18-74be-4097-8fbb-54fc2b665a47-c000.snappy.parquet
-rw-r--r-- 1 m1sk0 m1sk0 25M Mar  6 22:54 part-00003-75625f18-74be-4097-8fbb-54fc2b665a47-c000.snappy.parquet


## 1.3 Count Records

In [10]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [11]:
df = df \
    .withColumn('pickup_date', F.to_date(df.tpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.tpep_dropoff_datetime))

In [12]:
df.createOrReplaceTempView('df')

In [13]:
spark.sql("""
select count(1)
from df
where pickup_date = date('2024-10-15')
""").head()

                                                                                

Row(count(1)=128893)

In [14]:
spark.sql("""
select count(1)
from df
where pickup_date = date('2024-10-15')
    and dropoff_date = date('2024-10-15')
""").head()

                                                                                

Row(count(1)=127993)

## 1.4 Longest Trip

In [15]:
spark.sql("""
select (
    extract(day from tpep_dropoff_datetime - tpep_pickup_datetime) * 24
    + extract(hour from tpep_dropoff_datetime - tpep_pickup_datetime)
) as longest_trip_hours
from df
order by tpep_dropoff_datetime - tpep_pickup_datetime desc
limit 1
""").head()

                                                                                

Row(longest_trip_hours=162)

## 1.5 User Interface

The link in `Spark UI` (below) uses port 4040

In [16]:
spark

## 1.6 Least Frequent Pickup Location Zone

In [17]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-06 22:54:50--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 143.204.102.123, 143.204.102.120, 143.204.102.231, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|143.204.102.123|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-06 22:54:50 (113 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [18]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [19]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [20]:
df_zones.createOrReplaceTempView('df_zones')

In [21]:
# # an alternative :)

# df_taxi_2024_10 = df.join(
#     df_zones,
#     df.PULocationID == df_zones.LocationID,
#     how='left'
# )
# df_taxi_2024_10.createOrReplaceTempView('df_taxi_2024_10')
# spark.sql("""
# select
#     Zone
#     , count(1)  as rides
# from df_taxi_2024_10
# group by Zone
# order by rides
# limit 1
# """).head()

In [22]:
spark.sql("""
select
    df_zones.Zone
    , count(1)  as rides
from df
    left join df_zones on df_zones.LocationID = df.PULocationID
group by df_zones.Zone
order by rides
limit 1
""").head()

                                                                                

Row(Zone="Governor's Island/Ellis Island/Liberty Island", rides=1)

In [23]:
spark.stop()