In [114]:
import pyspark
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [115]:
pyspark.__file__

'/home/duc/spark/spark-3.3.2-bin-hadoop3/python/pyspark/__init__.py'

In [116]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [117]:
spark.version

'3.3.2'

In [118]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-02 05:55:42--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.84.160.213, 52.84.160.116, 52.84.160.84, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.84.160.213|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-02 05:55:43 (74.3 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

In [119]:
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

In [120]:
df.write.csv('data/csv/yellow/2024/10', mode='overwrite')

                                                                                

In [121]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True),
    types.StructField("Airport_fee", types.DoubleType(), True)
])

In [122]:
year = 2024
month = 10
input_path = f'yellow_tripdata_{year}-{month:02d}.csv'
df_yellow = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

df_yellow \
    .repartition(4) \
    .write.parquet(f'data/pq/yellow/{year}/{month:02d}/', mode='overwrite')

25/03/02 05:58:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2, 2024-10-10T20:28:12.000Z, 2024-10-10T20:31:40.000Z, 2, 0.44, 1, N, 239, 142, 1, 5.1, 1.0, 0.5, 2.52, 0.0, 1.0, 12.62, 2.5, 0.0
 Schema: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID, DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount, congestion_surcharge, Airport_fee
Expected: VendorID but found: 2
CSV file: file:///home/duc/de-zoomcamp-homework/05-batch/code/yellow_tripdata_2024-10.csv/part-00001-40c66d59-b3b1-4559-8e67-dc6634109a81-c000.csv
25/03/02 05:58:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 2, 2024-10-29T14:15:13.000Z, 2024-10-29T14:22:03.000Z, 1, 0.41, 1, N, 236, 237, 1, 7.9, 0.0, 0.5, 2.38, 0.0, 1.0, 14.28, 2.5, 0.0
 Schema: VendorID, tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count

                                                                                

In [123]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [124]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [125]:
df_yellow.registerTempTable('yellow_trip_data')

In [126]:
df_yellow \
    .withColumn('pickup_date', F.to_date(df_yellow.pickup_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

                                                                                

128893

In [129]:
spark.sql("""
SELECT
    CAST(pickup_datetime AS DATE) as date,
    count(*)
FROM
    yellow_trip_data
GROUP BY date
""").show()

[Stage 86:>                                                         (0 + 4) / 4]

+----------+--------+
|      date|count(1)|
+----------+--------+
|2024-10-24|  137337|
|2024-10-02|  113906|
|2024-10-25|  136066|
|2024-09-30|      12|
|2024-10-22|  121106|
|2024-10-18|  133213|
|2024-10-08|  121402|
|2024-10-10|  143205|
|2024-10-20|  117128|
|2024-10-01|  119117|
|2024-10-04|  112431|
|2024-10-15|  128893|
|2024-10-28|  110595|
|2024-10-29|  126930|
|2024-10-17|  136330|
|2024-10-31|  129394|
|2024-10-07|  102014|
|2024-11-01|      26|
|2024-10-11|  128821|
|2024-10-16|  134891|
+----------+--------+
only showing top 20 rows



                                                                                

In [130]:
spark.sql("""
SELECT
    ROUND((unix_timestamp(dropoff_datetime)-unix_timestamp(pickup_datetime)) / 3600,2) as diff
FROM
    yellow_trip_data
ORDER BY diff DESC
""").show()

[Stage 89:>                                                         (0 + 4) / 4]

+------+
|  diff|
+------+
|162.62|
|143.33|
|137.76|
|114.83|
|  89.9|
| 89.45|
|  70.3|
| 67.57|
| 66.07|
| 46.42|
| 42.31|
| 38.47|
| 33.95|
|  26.3|
| 25.29|
| 25.24|
| 24.47|
|  24.0|
|  24.0|
| 23.99|
+------+
only showing top 20 rows



                                                                                

In [131]:
zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [132]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [133]:
df_result = df_yellow.join(zones, df_yellow.PULocationID == zones.LocationID)


In [134]:
df_result.registerTempTable('yellow_trip_data_with_zones')

In [136]:
spark.sql("""
SELECT
    Zone,
    count(*) AS trip_count
FROM
    yellow_trip_data_with_zones
GROUP BY Zone
ORDER BY trip_count
""").show()

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
|       Arden Heights|         2|
|       Rikers Island|         2|
| Green-Wood Cemetery|         3|
|         Jamaica Bay|         3|
|Charleston/Totten...|         4|
|       Port Richmond|         4|
|   Rossville/Woodrow|         4|
|Eltingville/Annad...|         4|
|       West Brighton|         4|
|         Great Kills|         6|
|        Crotona Park|         6|
|     Mariners Harbor|         7|
|Heartland Village...|         7|
|Saint George/New ...|         9|
|             Oakwood|         9|
|New Dorp/Midland ...|        10|
|       Broad Channel|        10|
|         Westerleigh|        12|
|     Pelham Bay Park|        12|
+--------------------+----------+
only showing top 20 rows



                                                                                