Batch Pipeline â€“ Citibike daily station usage

This notebook builds a batch-processing pipeline that:
- Reads raw Citibike trip data (2024 monthly CSVs)
- Cleans and enriches trips with date, hour and duration features
- Aggregates trips per day and per start station
- Writes the result to a curated output table for dashboards/BigQuery


In [7]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

spark_conf = SparkConf()
spark_conf.setAppName("Citibike_Batch1")
spark_conf.setMaster("spark://spark-master:7077")  # Spark master from docker-compose

# Some reasonable local settings (you can tweak later)
spark_conf.set("spark.driver.memory", "2g")
spark_conf.set("spark.executor.memory", "2g")
spark_conf.set("spark.executor.cores", "1")
spark_conf.set("spark.driver.cores", "1")

spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

spark


In [8]:
df_test = spark.createDataFrame([("Alice", 30), ("Bob", 25)], ["name", "age"])
df_test.show()


+-----+---+
| name|age|
+-----+---+
|Alice| 30|
|  Bob| 25|
+-----+---+



In [9]:
import os

data_root = "/home/jovyan/data"

In [10]:
from pyspark.sql import functions as F

citibike_path = "/home/jovyan/data"          
pattern = citibike_path + "/*.csv"          

df_trips = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(pattern)

df_trips.printSchema()
df_trips.show(5)


root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lng: double (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lng: double (nullable = true)
 |-- member_casual: string (nullable = true)

+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+-----------------+------------------+-----------------+------------------+-------------+
|         ride_id|rideable_type|          started_at|            ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|        start_lat|         start_lng|       

In [11]:
from pyspark.sql import functions as F

# 1. Add derived columns: date, hour, duration (minutes)
df_enriched = (
    df_trips
    .withColumn("date", F.to_date("started_at"))
    .withColumn("start_hour", F.hour("started_at"))
    .withColumn(
        "trip_duration_min",
        (F.col("ended_at").cast("long") - F.col("started_at").cast("long")) / 60.0
    )
)

df_enriched.select(
    "ride_id", "started_at", "ended_at",
    "date", "start_hour", "trip_duration_min", "member_casual"
).show(10)


+----------------+--------------------+--------------------+----------+----------+------------------+-------------+
|         ride_id|          started_at|            ended_at|      date|start_hour| trip_duration_min|member_casual|
+----------------+--------------------+--------------------+----------+----------+------------------+-------------+
|172DBBFC733F03CE|2024-10-10 14:54:...|2024-10-10 15:04:...|2024-10-10|        14| 9.716666666666667|       member|
|D20BBA4860FE736C|2024-10-03 19:20:...|2024-10-03 19:31:...|2024-10-03|        19|11.416666666666666|       casual|
|86F89348995D0E6E|2024-10-20 12:14:...|2024-10-20 12:28:...|2024-10-20|        12|              13.6|       casual|
|AA55A717B7EC1D10|2024-10-20 14:40:...|2024-10-20 14:55:...|2024-10-20|        14|              15.4|       member|
|C72953D91E986DA7|2024-10-20 08:37:...|2024-10-20 08:42:...|2024-10-20|         8|              5.35|       member|
|23A1827EA03A9AC2|2024-10-28 19:20:...|2024-10-28 19:25:...|2024-10-28| 


In this section we inspect the distribution of trip_duration_min to choose a reasonable
upper bound (filtering clear outliers such as rides that last many hours because a user
forgot to end the trip).


In [None]:
#Explor the distribution of trip_duration_min to choose a reasonable upper bound to use to filter clear outliers

In [14]:
from pyspark.sql import functions as F

#Summary statistics
df_enriched.select("trip_duration_min").summary().show()

#Explore if we want to use 180 or 240
total_trips = df_enriched.count()
above_180 = df_enriched.filter(F.col("trip_duration_min") >= 180).count()
above_240 = df_enriched.filter(F.col("trip_duration_min") >= 240).count()

print("Total trips:", total_trips)
print("Trips >= 180 min (>= 3h):", above_180, f"({above_180 / total_trips:.4%})")
print("Trips >= 240 min (>= 4h):", above_240, f"({above_240 / total_trips:.4%})")



+-------+------------------+
|summary| trip_duration_min|
+-------+------------------+
|  count|           1052451|
|   mean|10.640391096592568|
| stddev| 39.50522800270042|
|    min|-53.56666666666667|
|    25%| 4.033333333333333|
|    50%| 6.133333333333334|
|    75%| 9.716666666666667|
|    max| 8968.333333333334|
+-------+------------------+

Total trips: 1052451
Trips >= 180 min (>= 3h): 2669 (0.2536%)
Trips >= 240 min (>= 4h): 2107 (0.2002%)


In [15]:
# 2. Basic cleaning: remove trips with missing/negative/too long duration
df_clean = (
    df_enriched
    .filter(F.col("trip_duration_min").isNotNull())
    .filter(F.col("trip_duration_min") > 0)
    .filter(F.col("trip_duration_min") < 180)  #maximum of 3 hour trips
)

df_clean.select("trip_duration_min").summary().show()


+-------+--------------------+
|summary|   trip_duration_min|
+-------+--------------------+
|  count|             1049760|
|   mean|   9.160629667733572|
| stddev|  11.501112713169428|
|    min|0.016666666666666666|
|    25%|   4.033333333333333|
|    50%|   6.133333333333334|
|    75%|   9.666666666666666|
|    max|              179.85|
+-------+--------------------+



In [16]:
from pyspark.sql import functions as F

daily_station_stats = (
    df_clean
    .groupBy("date", "start_station_id", "start_station_name")
    .agg(
        F.count("*").alias("n_trips"),
        F.avg("trip_duration_min").alias("avg_trip_duration_min"),
        F.expr("percentile_approx(trip_duration_min, 0.5)").alias("median_trip_duration_min"),
        F.sum(F.when(F.col("member_casual") == "member", 1).otherwise(0)).alias("n_member"),
        F.sum(F.when(F.col("member_casual") == "casual", 1).otherwise(0)).alias("n_casual")
    )
    .withColumn("share_member", F.col("n_member") / F.col("n_trips"))
    .withColumn("share_casual", F.col("n_casual") / F.col("n_trips"))
)

daily_station_stats.orderBy("date", "start_station_id").show(20, truncate=False)
daily_station_stats.printSchema()


+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+
|date      |start_station_id|start_station_name                          |n_trips|avg_trip_duration_min|median_trip_duration_min|n_member|n_casual|share_member       |share_casual       |
+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+
|2024-01-01|HB101           |Hoboken Terminal - Hudson St & Hudson Pl    |32     |7.080729166666666    |6.566666666666666       |17      |15      |0.53125            |0.46875            |
|2024-01-01|HB102           |Hoboken Terminal - River St & Hudson Pl     |75     |11.722666666666669   |8.116666666666667       |37      |38      |0.49333333333333335|0.5066666666666667 |
|2024-01-01|HB103           |South Waterfront Walkway - Sina

In [17]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Window ordered by date for each station
w_station = Window.partitionBy("start_station_id").orderBy("date")

# 7-day rolling window: current day + previous 6 days
w_station_7 = w_station.rowsBetween(-6, 0)

daily_station_enriched = (
    daily_station_stats
    # previous day's trips per station
    .withColumn("prev_n_trips", F.lag("n_trips").over(w_station))
    # absolute change vs previous day
    .withColumn("delta_n_trips", F.col("n_trips") - F.col("prev_n_trips"))
    # relative change vs previous day
    .withColumn(
        "delta_n_trips_pct",
        F.when(F.col("prev_n_trips").isNotNull() & (F.col("prev_n_trips") > 0),
               F.col("delta_n_trips") / F.col("prev_n_trips"))
    )
    # 7-day rolling average of n_trips per station
    .withColumn("n_trips_7day_avg", F.avg("n_trips").over(w_station_7))
)

daily_station_enriched.orderBy("date", "start_station_id").show(20, truncate=False)
daily_station_enriched.printSchema()


+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+------------+-------------+-----------------+----------------+
|date      |start_station_id|start_station_name                          |n_trips|avg_trip_duration_min|median_trip_duration_min|n_member|n_casual|share_member       |share_casual       |prev_n_trips|delta_n_trips|delta_n_trips_pct|n_trips_7day_avg|
+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+------------+-------------+-----------------+----------------+
|2024-01-01|HB101           |Hoboken Terminal - Hudson St & Hudson Pl    |32     |7.080729166666666    |6.566666666666666       |17      |15      |0.53125            |0.46875            |NULL        |NULL         |NULL             |32.0            |


In [19]:
from pyspark.sql import functions as F

daily_station_enriched.filter(F.col("start_station_id") == "HB101") \
    .orderBy("date") \
    .select("date", "n_trips", "prev_n_trips", "delta_n_trips", 
            "delta_n_trips_pct", "n_trips_7day_avg") \
    .show(15, truncate=False)


+----------+-------+------------+-------------+--------------------+------------------+
|date      |n_trips|prev_n_trips|delta_n_trips|delta_n_trips_pct   |n_trips_7day_avg  |
+----------+-------+------------+-------------+--------------------+------------------+
|2024-01-01|32     |NULL        |NULL         |NULL                |32.0              |
|2024-01-02|56     |32          |24           |0.75                |44.0              |
|2024-01-03|56     |56          |0            |0.0                 |48.0              |
|2024-01-04|46     |56          |-10          |-0.17857142857142858|47.5              |
|2024-01-05|58     |46          |12           |0.2608695652173913  |49.6              |
|2024-01-06|18     |58          |-40          |-0.6896551724137931 |44.333333333333336|
|2024-01-07|38     |18          |20           |1.1111111111111112  |43.42857142857143 |
|2024-01-08|59     |38          |21           |0.5526315789473685  |47.285714285714285|
|2024-01-09|32     |59          

In [18]:
# Window per day, ordered by busiest station
w_day = Window.partitionBy("date").orderBy(F.col("n_trips").desc())

daily_ranked = (
    daily_station_enriched
    .withColumn("rank_by_date", F.row_number().over(w_day))
)

# Keep full table with rank
daily_ranked.show(20, truncate=False)

# Also create a separate table with only the top 10 stations per day
top10_per_day = daily_ranked.filter(F.col("rank_by_date") <= 10)

top10_per_day.orderBy("date", "rank_by_date").show(20, truncate=False)


+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+------------------+-------------------+------------+-------------+--------------------+----------------+------------+
|date      |start_station_id|start_station_name                          |n_trips|avg_trip_duration_min|median_trip_duration_min|n_member|n_casual|share_member      |share_casual       |prev_n_trips|delta_n_trips|delta_n_trips_pct   |n_trips_7day_avg|rank_by_date|
+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+------------------+-------------------+------------+-------------+--------------------+----------------+------------+
|2024-01-02|HB102           |Hoboken Terminal - River St & Hudson Pl     |100    |6.8285000000000045   |5.583333333333333       |78      |22      |0.78              |0.22               |75          |25    

In [23]:
# Paths inside the container (mapped to ~/data/output on the VM)
output_daily_enriched = "/home/jovyan/data/output/citibike_station_daily_enriched"
output_top10 = "/home/jovyan/data/output/citibike_station_daily_top10"

# Write full enriched daily table
(
    daily_station_enriched
    .repartition(1)              # optional: fewer files, easier to inspect
    .write
    .mode("overwrite")
    .format("parquet")           # or "csv" if you prefer
    .save(output_daily_enriched)
)

# Write top-10-per-day table
(
    top10_per_day
    .repartition(1)
    .write
    .mode("overwrite")
    .format("parquet")
    .save(output_top10)
)



In [24]:
df_top10 = spark.read.parquet("/home/jovyan/data/output/citibike_station_daily_top10")
df_top10.show(20, truncate=False)
df_top10.count()


+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+------------+-------------+-------------------+----------------+------------+
|date      |start_station_id|start_station_name                          |n_trips|avg_trip_duration_min|median_trip_duration_min|n_member|n_casual|share_member       |share_casual       |prev_n_trips|delta_n_trips|delta_n_trips_pct  |n_trips_7day_avg|rank_by_date|
+----------+----------------+--------------------------------------------+-------+---------------------+------------------------+--------+--------+-------------------+-------------------+------------+-------------+-------------------+----------------+------------+
|2024-01-01|JC115           |Grove St PATH                               |84     |9.616666666666665    |5.216666666666667       |67      |17      |0.7976190476190477 |0.20238095238095238|NULL        |NULL 

3660