## 1. Spark Session Configuration with Delta Lake and S3 Support

In [79]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CricketDeltaPipeline") \
    .config("spark.jars", ",".join([
        r"C:\\spark\\spark-3.5.5-bin-hadoop3\\jars\\delta-core_2.12-3.1.0.jar",
        r"C:\\spark\\spark-3.5.5-bin-hadoop3\\jars\\delta-storage-3.1.0.jar",
        r"C:\\spark\\spark-3.5.5-bin-hadoop3\\jars\\hadoop-aws-3.3.4.jar",
        r"C:\\spark\\spark-3.5.5-bin-hadoop3\\jars\\aws-java-sdk-bundle-1.12.430.jar"
    ])) \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

## 2. Read Raw Parquet from S3

In [80]:
from datetime import datetime
import pyspark.sql.functions as F

now = datetime.utcnow()
s3_path = f"s3a://aws-glue-assets-cricket/raw_cricket_data/year={now.year}/month={now.strftime('%m')}/*/*/"
df = spark.read.parquet(s3_path)
df = df.withColumn("event_time_ts", F.to_timestamp("event_time"))

## 3. Deduplicate Rows by ID + Latest Timestamp

In [81]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("id").orderBy(F.col("event_time_ts").desc())
df = df.withColumn("row_num", F.row_number().over(window_spec))
latest_df = df.filter(F.col("row_num") == 1).drop("row_num")

## 4. Add Match Status Based on JSON Column

In [82]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

json_schema_status = StructType([
    StructField("matchStarted", StringType(), True),
    StructField("matchEnded", StringType(), True),
    StructField("score", ArrayType(StringType()), True)
])

latest_df = latest_df.withColumn("json_data_parsed", F.from_json(F.col("json_data"), json_schema_status))
latest_df = latest_df.withColumn("match_status", F.when(
    (F.col("json_data_parsed.matchStarted") == "true") &
    (F.col("json_data_parsed.matchEnded") == "false") &
    (F.size(F.col("json_data_parsed.score")) > 0), "Live")
    .when((F.col("json_data_parsed.matchStarted") == "true") &
          (F.col("json_data_parsed.matchEnded") == "false") &
          (F.size(F.col("json_data_parsed.score")) == 0), "Upcoming")
    .when((F.col("json_data_parsed.matchEnded") == "true"), "Completed")
    .otherwise("Unknown")
)

In [83]:
latest_df.show(truncate=False)  

+------------------------------------+---------------------------------------------------------------+---------+--------------------------+-----------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## 5. Generic Method to extract Matches (be it Live or Non-Live)

In [84]:
def match_status_wise_filteration (filterText):
    json_schema_teams = StructType([
        StructField("venue", StringType(), True),
        StructField("date", StringType(), True),
        StructField("teamInfo", ArrayType(StructType([
            StructField("name", StringType(), True),
            StructField("shortname", StringType(), True),
            StructField("img", StringType(), True)
        ])), True)
    ])
    if filterText == "Live":
        live_df = latest_df.filter(F.col("match_status") == filterText) \
            .withColumn("json_data_parsed", F.from_json(F.col("json_data"), json_schema_teams)) \
            .select(
                "id", "name", F.upper("matchType").alias("matchType"),
                F.col("json_data_parsed.venue"),
                F.col("json_data_parsed.date"),
                "match_status",
                F.col("json_data_parsed.teamInfo")[0]["name"].alias("team1_name"),
                F.col("json_data_parsed.teamInfo")[0]["shortname"].alias("team1_shortname"),
                F.col("json_data_parsed.teamInfo")[0]["img"].alias("team1_img"),
                F.col("json_data_parsed.teamInfo")[1]["name"].alias("team2_name"),
                F.col("json_data_parsed.teamInfo")[1]["shortname"].alias("team2_shortname"),
                F.col("json_data_parsed.teamInfo")[1]["img"].alias("team2_img"),
                F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss").alias("EffectiveDateTime")
            )
        return live_df
    else:
        non_live_df = latest_df.filter(F.col("match_status") != "Live") \
            .withColumn("json_data_parsed", F.from_json(F.col("json_data"), json_schema_teams)) \
            .select(
                "id", "name", F.upper("matchType").alias("matchType"),
                F.col("json_data_parsed.venue"),
                F.col("json_data_parsed.date"),
                "match_status",
                F.col("json_data_parsed.teamInfo")[0]["name"].alias("team1_name"),
                F.col("json_data_parsed.teamInfo")[0]["shortname"].alias("team1_shortname"),
                F.col("json_data_parsed.teamInfo")[0]["img"].alias("team1_img"),
                F.col("json_data_parsed.teamInfo")[1]["name"].alias("team2_name"),
                F.col("json_data_parsed.teamInfo")[1]["shortname"].alias("team2_shortname"),
                F.col("json_data_parsed.teamInfo")[1]["img"].alias("team2_img"),
                F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss").alias("EffectiveDateTime")
            )
        return non_live_df
            


## Fetch LIVE Matches

In [85]:
live_df = match_status_wise_filteration("Live")

## Fetch Non-Live Matches

In [86]:
non_live_df = match_status_wise_filteration("NonLive")

In [87]:
non_live_df.show()

+--------------------+--------------------+---------+--------------------+----------+------------+--------------------+---------------+--------------------+--------------------+---------------+--------------------+-------------------+
|                  id|                name|matchType|               venue|      date|match_status|          team1_name|team1_shortname|           team1_img|          team2_name|team2_shortname|           team2_img|  EffectiveDateTime|
+--------------------+--------------------+---------+--------------------+----------+------------+--------------------+---------------+--------------------+--------------------+---------------+--------------------+-------------------+
|3a0341c7-6df0-4b5...|Chepauk Super Gil...|      T20|NPR College Groun...|2025-07-01|   Completed|Chepauk Super Gil...|            CSG|https://g.cricapi...|IDream Tiruppur T...|            ITT|https://g.cricapi...|2025-07-02 00:13:38|
|b10d079c-2321-47f...|Texas Super Kings...|      T20|Grand P

## 6. Define Delta Upsert Function (foreachBatch) - LIVE Matches

In [88]:
from pyspark.sql.functions import year, month, dayofmonth, col

# Define S3 paths
target_path = "s3a://aws-glue-assets-cricket/output_cricket/live/cricket_data"
checkpoint_path = "s3a://aws-glue-assets-cricket/output_cricket/live/cricket_data/checkpoints"

# Add partition columns to live_df
live_df = live_df.withColumn("year", year(col("EffectiveDateTime"))) \
                 .withColumn("month", month(col("EffectiveDateTime"))) \
                 .withColumn("day", dayofmonth(col("EffectiveDateTime")))

# Define upsert function using Delta SQL
def upsert_to_delta(microBatchDF, batchId):

    # Check if Delta table exists
    try:
        spark.read.format("delta").load(target_path)
        table_exists = True
    except Exception as e:
        table_exists = False

    if not table_exists:
        # Create the Delta table if it does not exist
        microBatchDF.write \
            .format("delta") \
            .partitionBy("year", "month", "day") \
            .mode("overwrite") \
            .save(target_path)
    else:
        # Perform upsert (merge) if table exists
        microBatchDF.createOrReplaceTempView("source_table")
        spark.sql(f"""
            MERGE INTO delta.`{target_path}` AS target
            USING source_table AS source
            ON target.id = source.id AND target.date = source.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """)


# Call for batch upsert
upsert_to_delta(live_df, 0)


## 6. Define Delta Upsert Function (foreachBatch) - NON LIVE Matches

In [89]:
from pyspark.sql.functions import year, month, dayofmonth, col

# Define S3 paths
target_path = "s3a://aws-glue-assets-cricket/output_cricket/nonlive/cricket_data"
checkpoint_path = "s3a://aws-glue-assets-cricket/output_cricket/nonlive/cricket_data/checkpoints"

# Add partition columns to live_df
non_live_df = non_live_df.withColumn("year", year(col("EffectiveDateTime"))) \
                 .withColumn("month", month(col("EffectiveDateTime"))) \
                 .withColumn("day", dayofmonth(col("EffectiveDateTime")))

# Define upsert function using Delta SQL
def upsert_to_delta(microBatchDF, batchId):

    # Check if Delta table exists
    try:
        spark.read.format("delta").load(target_path)
        table_exists = True
    except Exception as e:
        table_exists = False

    if not table_exists:
        # Create the Delta table if it does not exist
        microBatchDF.write \
            .format("delta") \
            .partitionBy("year", "month", "day") \
            .mode("overwrite") \
            .save(target_path)
    else:
        # Perform upsert (merge) if table exists
        microBatchDF.createOrReplaceTempView("source_table")
        spark.sql(f"""
            MERGE INTO delta.`{target_path}` AS target
            USING source_table AS source
            ON target.id = source.id AND target.date = source.date
            WHEN MATCHED THEN UPDATE SET *
            WHEN NOT MATCHED THEN INSERT *
        """)


# Call for batch upsert
upsert_to_delta(non_live_df, 0)