In [1]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-12 21:59:36--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2123:1600:b:20a5:b140:21, 2600:9000:2123:de00:b:20a5:b140:21, 2600:9000:2123:3600:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2123:1600:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: 'yellow_tripdata_2024-10.parquet'

     0K .......... .......... .......... .......... ..........  0%  587K 1m47s
    50K .......... .......... .......... .......... ..........  0%  854K 90s
   100K .......... .......... .......... .......... ..........  0% 1.74M 72s
   150K .......... .......... .......... .......... ..........  0% 1.56M 64s
   200K .......... .......... .......... .......... ..........  0% 2.72M 55s
   250K .......... .......... .......... .......... .

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
print(pyspark.__version__)

3.5.5


In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .config("spark.ui.port", "4040") \
    .getOrCreate()

In [4]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.LongType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.LongType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.LongType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [5]:
df_yellow= spark.read \
    .schema(yellow_schema) \
    .parquet('./yellow_tripdata_2024-10.parquet')

In [6]:
df_yellow \
    .repartition(4) \
    .write.parquet('data/pq/', mode='overwrite')

In [9]:
from pyspark.sql.functions import dayofmonth, month, round
df_yellow = df_yellow.withColumn("pickup_day", dayofmonth("tpep_pickup_datetime"))
df_yellow = df_yellow.withColumn("pickup_month", month("tpep_pickup_datetime"))
df_yellow = df_yellow.withColumn('duration_min', round((df_yellow.tpep_dropoff_datetime.cast('long') - df_yellow.tpep_pickup_datetime.cast('long'))/60, 1))
df_yellow = df_yellow.withColumn('duration_hour', round(df_yellow.duration_min/60, 1))

In [10]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'pickup_day',
 'pickup_month',
 'duration_min',
 'duration_hour']

In [11]:
df_yellow.show(20)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+------------+------------+-------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_day|pickup_month|duration_min|duration_hour|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+------------+------------+-------------+
|       2| 2024-09-30 19:30:44|  2024-09-30 19:48:26|              1| 

In [12]:
df_yellow.registerTempTable('yellow')



In [13]:
df_result = spark.sql("""
SELECT
    COUNT(*)
FROM
    yellow
WHERE
    pickup_day = 15 AND pickup_month = 10
""")

In [14]:
df_result.show()

+--------+
|count(1)|
+--------+
|  128892|
+--------+



In [15]:
df_result_2 = spark.sql("""
SELECT
    duration_hour
FROM
    yellow
ORDER BY
    duration_hour DESC
""")

In [16]:
df_result_2.show(20)

+-------------+
|duration_hour|
+-------------+
|        162.6|
|        143.3|
|        137.8|
|        114.8|
|         89.9|
|         89.4|
|         70.3|
|         67.6|
|         66.1|
|         46.4|
|         42.3|
|         38.5|
|         34.0|
|         26.3|
|         25.3|
|         25.2|
|         24.5|
|         24.0|
|         24.0|
|         24.0|
+-------------+
only showing top 20 rows



In [23]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-12 22:52:33--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 2600:9000:2123:9600:b:20a5:b140:21, 2600:9000:2123:cc00:b:20a5:b140:21, 2600:9000:2123:9c00:b:20a5:b140:21, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:2123:9600:b:20a5:b140:21|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: 'taxi_zone_lookup.csv'

     0K .......... ..                                         100% 1.98G=0s

2025-03-12 22:52:35 (1.98 GB/s) - 'taxi_zone_lookup.csv' saved [12331/12331]



In [17]:
df_zones_lookup = spark.read \
    .option("header", True) \
    .csv('taxi_zone_lookup.csv')

In [18]:
df_join = df_yellow.join(df_zones_lookup, on=df_yellow.PULocationID == df_zones_lookup.LocationID, how='inner')

In [19]:
df_join.show(20)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+------------+------------+-------------+----------+---------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_day|pickup_month|duration_min|duration_hour|LocationID|  Borough|                Zone|service_zone|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------+-

In [22]:
df_join.registerTempTable('joined')



In [28]:
df_result_3 = spark.sql("""
SELECT
    zone,
    COUNT(1) as cnt
FROM   
    joined
WHERE
    pickup_month = 10
GROUP BY 
    zone
ORDER BY cnt ASC              
""")
df_result_3.show()

+--------------------+---+
|                zone|cnt|
+--------------------+---+
|Governor's Island...|  1|
|       Rikers Island|  2|
|       Arden Heights|  2|
|         Jamaica Bay|  3|
| Green-Wood Cemetery|  3|
|Charleston/Totten...|  4|
|   Rossville/Woodrow|  4|
|       West Brighton|  4|
|Eltingville/Annad...|  4|
|       Port Richmond|  4|
|         Great Kills|  6|
|        Crotona Park|  6|
|Heartland Village...|  7|
|     Mariners Harbor|  7|
|Saint George/New ...|  9|
|             Oakwood|  9|
|       Broad Channel| 10|
|New Dorp/Midland ...| 10|
|         Westerleigh| 12|
|     Pelham Bay Park| 12|
+--------------------+---+
only showing top 20 rows

