In [1]:
import requests
import json
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, Row
# from pyspark.sql.functions import col
# from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, IntegerType,
    TimestampType, ArrayType
)
# from pyspark.sql.functions import col, to_timestamp, lit, coalesce, from_json, explode
from pyspark.sql.functions import col, from_json, explode, max as spark_max, min as spark_min, regexp_replace
import psycopg2
from pprint import pprint
import re
from concurrent.futures import ThreadPoolExecutor, as_completed


In [2]:
date_str = datetime.now().strftime("%d-%m-%Y")
date_str_api = date_str
date_str_table = date_str.replace('-', '_')
print(date_str, date_str_table)

20-08-2025 20_08_2025


In [3]:
spark = (
    SparkSession.builder
    .appName("ReadLiveStatusFromKafka")
    .master("local[*]")
    .config(
        "spark.jars.packages",
        "org.postgresql:postgresql:42.7.4,"
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
    )
    .getOrCreate()
)

In [4]:
JDBC_URL = "jdbc:postgresql://postgres:5432/railway"
DB_PROPERTIES = {
    "user": "postgres",
    "password": "iaCkmHPhuyhFLEBDGdwxQGGqlHvdgWJA",
    "driver": "org.postgresql.Driver"
}

In [5]:
raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("subscribe", "live-train-status") \
  .option("includeHeaders", "true") \
  .load()

In [6]:
decoded_df = raw_df.selectExpr(
    "CAST(key AS STRING) as train_number",
    "CAST(value AS STRING) as stations_json"
)

In [7]:
station_schema = ArrayType(
    StructType([
        StructField("index",     StringType(), True),
        StructField("station",   StringType(), True),
        StructField("arr",       StringType(), True),
        StructField("dep",       StringType(), True),
        StructField("delay",     StringType(), True),
        StructField("status",    StringType(), True),
        StructField("current",   StringType(), True),
    ])
)

In [8]:
parsed_df = decoded_df.withColumn("stations", from_json("stations_json", station_schema))

In [9]:
exploded_df = parsed_df.withColumn("stations", from_json(col("stations_json"), station_schema)) \
                       .withColumn("station", explode("stations"))

In [10]:
station_df = exploded_df.select(
    "train_number",
    col("station.station").alias("station"),
    col("station.status").alias("status"),
    regexp_replace(col("station.delay"), "[^0-9]", "").cast("bigint").alias("delay_minutes"),
    col("station.index").cast("int").alias("index")
)

In [12]:
def write_to_postgres(batch_df, batch_id):
    last_crossed_df = batch_df.filter("status = 'crossed'") \
        .groupBy("train_number").agg(spark_max("station").alias("last_crossed"))

    next_upcoming_df = batch_df.filter("status = 'upcoming'") \
        .groupBy("train_number").agg(spark_min("station").alias("next_upcoming"))

    delay_df = batch_df.groupBy("train_number").agg(spark_max("delay_minutes").alias("delay_minutes"))

    final_df = last_crossed_df \
        .join(next_upcoming_df, "train_number", "left") \
        .join(delay_df, "train_number", "left")

    final_df.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/railway") \
        .option("dbtable", f"live_status_{date_str_table}") \
        .option("user", "postgres") \
        .option("password", "iaCkmHPhuyhFLEBDGdwxQGGqlHvdgWJA") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

In [13]:
query = station_df.writeStream \
    .outputMode("append") \
    .foreachBatch(write_to_postgres) \
    .option("checkpointLocation", f"/home/jupyter/work/checkpoint/train_status_{date_str}/") \
    .start()

In [None]:
query.awaitTermination()

In [None]:
print(spark.sparkContext.getConf().get("spark.jars.packages"))
print(spark.sparkContext.getConf().get("spark.jars"))
print(spark.sparkContext._jvm.java.lang.Class.forName("org.postgresql.Driver"))