In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, current_timestamp, date_format, dense_rank, lit, to_date
from pyspark.sql.window import Window

In [2]:
# Initialize Spark session
spark = SparkSession \
    .builder \
    .appName("spark-nb") \
    .master("spark://spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
# Input table name containing trip records
data_table = "local_db.sample_hive_table"

# Output table name
output_table = "local_db.daily_topfive_taxi_zone"

In [4]:
# Get current date (execution date) and calculate the target date for data filtering
execution_date = spark.sql("SELECT current_date() as current_date").collect()[0]["current_date"]
target_date = spark.sql(f"SELECT date_sub('{execution_date}', 1) as target_date").collect()[0]["target_date"]

In [5]:
# Read the input data
data = spark.table(data_table)

In [9]:
# Filter the data for trips that occurred before the target date
filtered_data = data.filter(to_date(col("lpep_pickup_datetime")) < lit(target_date))

In [10]:
# Calculate the top-5 TLC Taxi Zones based on trip count
top_five_zones = filtered_data.groupBy("PULocationID") \
    .agg(count("*").alias("trip_count")) \
    .withColumn("rank", dense_rank().over(Window.orderBy(col("trip_count").desc()))) \
    .filter(col("rank") <= 5) \
    .select(
        col("PULocationID").alias("taxi_zone_id"),
        col("rank"),
        date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss").alias("calculated_at")
    )

In [11]:
# Write the result into the output table
top_five_zones.write \
    .mode("overwrite") \
    .format("hive") \
    .saveAsTable(output_table)

In [12]:
spark.sql("SELECT * FROM local_db.sample_hive_table LIMIT 5").toPandas()

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2024-02-15 23:37:22,2024-02-15 23:45:56,N,1.0,95,95,1.0,1.36,9.3,1.0,0.5,0.0,0.0,,1.0,11.8,2.0,1.0,0.0
1,2,2024-02-15 23:34:45,2024-02-15 23:45:17,N,1.0,75,151,1.0,2.1,12.1,1.0,0.5,0.0,0.0,,1.0,14.6,2.0,1.0,0.0
2,2,2024-02-15 23:31:24,2024-02-15 23:41:46,N,1.0,74,151,1.0,2.34,12.8,1.0,0.5,0.76,0.0,,1.0,16.06,1.0,1.0,0.0
3,2,2024-02-15 23:29:00,2024-02-15 23:41:00,,,243,247,,3.65,17.61,0.0,0.0,3.72,0.0,,1.0,22.33,,,
4,2,2024-02-15 23:18:00,2024-02-15 23:36:00,,,43,116,,3.66,20.95,0.0,0.0,3.29,0.0,,1.0,25.24,,,


In [13]:
spark.sql("SELECT * FROM local_db.daily_topfive_taxi_zone LIMIT 5").toPandas()

Unnamed: 0,taxi_zone_id,rank,calculated_at
0,74,1,2024-12-11 14:17:09
1,75,2,2024-12-11 14:17:09
2,95,3,2024-12-11 14:17:09
3,166,4,2024-12-11 14:17:09
4,43,5,2024-12-11 14:17:09


In [14]:
# Stop the Spark session
spark.stop()