In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/26 10:31:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.conf.set("spark.sql.session.timeZone", "Europe/Berlin")

#### 1. Check Spark version

In [4]:
spark.version

'3.4.4'

#### 2. Check the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)

In [5]:
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

In [6]:
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [7]:
df.count()

3833771

In [8]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [9]:
schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('passenger_count', types.LongType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('RatecodeID', types.LongType(), True), 
    types.StructField('store_and_fwd_flag', types.StringType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('payment_type', types.LongType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True), 
    types.StructField('improvement_surcharge', types.DoubleType(), True), 
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True), 
    types.StructField('Airport_fee', types.DoubleType(), True)])

In [10]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .parquet('yellow_tripdata_2024-10.parquet')

In [11]:
output_path = f'data/pq/yellow/2024/10/'

df \
.repartition(4) \
.write.parquet(output_path, mode='overwrite')

                                                                                

In [12]:
df_yellow = spark.read.parquet('data/pq/yellow/2024/10')

In [13]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [14]:
df_yellow.count()

3833771

In [15]:
tz = spark.conf.get("spark.sql.session.timeZone")
df_yellow = df_yellow \
.withColumn('pickup_date', F.to_date(df_yellow.tpep_pickup_datetime)) \
.withColumn('dropoff_date', F.to_date(df_yellow.tpep_dropoff_datetime)) \
.select('tpep_pickup_datetime','pickup_date', 'tpep_dropoff_datetime','dropoff_date', 'PULocationID', 'DOLocationID') \
.sort(['tpep_pickup_datetime'], ascending= True)

In [16]:
df_yellow \
.filter((F.col('pickup_date') == F.lit('2024-10-15')) & (F.col('dropoff_date') != F.lit('2024-10-15'))) \
.sort(['tpep_pickup_datetime'], ascending= False) \
.show(10)

[Stage 11:>                                                         (0 + 4) / 4]

+--------------------+-----------+---------------------+------------+------------+------------+
|tpep_pickup_datetime|pickup_date|tpep_dropoff_datetime|dropoff_date|PULocationID|DOLocationID|
+--------------------+-----------+---------------------+------------+------------+------------+
| 2024-10-15 23:59:59| 2024-10-15|  2024-10-16 00:10:46|  2024-10-16|          79|         162|
| 2024-10-15 23:59:58| 2024-10-15|  2024-10-16 00:05:17|  2024-10-16|         142|         143|
| 2024-10-15 23:59:58| 2024-10-15|  2024-10-16 00:06:52|  2024-10-16|         114|         231|
| 2024-10-15 23:59:57| 2024-10-15|  2024-10-16 00:08:34|  2024-10-16|         239|         151|
| 2024-10-15 23:59:57| 2024-10-15|  2024-10-16 00:24:50|  2024-10-16|         161|          79|
| 2024-10-15 23:59:57| 2024-10-15|  2024-10-16 00:32:20|  2024-10-16|         230|          49|
| 2024-10-15 23:59:57| 2024-10-15|  2024-10-16 00:14:23|  2024-10-16|         138|         233|
| 2024-10-15 23:59:57| 2024-10-15|  2024

                                                                                

#### 3. Check how many taxi trips were there on the 15th of October

In [17]:
tz = spark.conf.get("spark.sql.session.timeZone")
df_yellow \
.select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
.filter((F.col('pickup_date') == F.lit('2024-10-15')) & (F.col('dropoff_date') == F.lit('2024-10-15'))) \
.count()

                                                                                

123221

In [18]:
#### 4. Check the length of the longest trip in the dataset in hours

In [19]:
df_yellow \
    .select('tpep_pickup_datetime','tpep_dropoff_datetime') \
    .withColumn('DiffInSeconds',F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) \
    .withColumn('DiffInHours',F.col('DiffInSeconds')/3600) \
    .sort('DiffInHours', ascending=False) \
    .show(5)



+--------------------+---------------------+-------------+------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|DiffInSeconds|       DiffInHours|
+--------------------+---------------------+-------------+------------------+
| 2024-10-16 15:03:49|  2024-10-23 09:40:53|       585424|162.61777777777777|
| 2024-10-03 20:47:25|  2024-10-09 20:06:55|       515970|           143.325|
| 2024-10-22 18:00:55|  2024-10-28 10:46:33|       495938|137.76055555555556|
| 2024-10-18 11:53:32|  2024-10-23 06:43:37|       413405|114.83472222222223|
| 2024-10-21 02:36:24|  2024-10-24 20:30:18|       323634| 89.89833333333333|
+--------------------+---------------------+-------------+------------------+
only showing top 5 rows



                                                                                

In [20]:
df_look = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [21]:
df_look.head(3)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone')]

#### 5. Using the zone lookup data and the Yellow October 2024 data, what is the name of the LEAST frequent pickup location Zone?

In [22]:
df_look.createOrReplaceTempView('lookup_data')

In [23]:
df_lookup = spark.sql("select LocationID, Zone from lookup_data")

In [24]:
df_result = df_yellow.join(df_lookup, df_yellow.PULocationID == df_lookup.LocationID)

In [25]:
df_result \
.drop('PULocationID','LocationID') \
.select('Zone') \
.groupBy('Zone') \
.count() \
.sort(F.col('count')) \
.show(5, False)

[Stage 19:>                                                         (0 + 4) / 4]

+---------------------------------------------+-----+
|Zone                                         |count|
+---------------------------------------------+-----+
|Governor's Island/Ellis Island/Liberty Island|1    |
|Rikers Island                                |2    |
|Arden Heights                                |2    |
|Jamaica Bay                                  |3    |
|Green-Wood Cemetery                          |3    |
+---------------------------------------------+-----+
only showing top 5 rows



                                                                                