In [None]:
%sql
-- Create the gold fact table (run once or if schema changes needed)
CREATE TABLE IF NOT EXISTS gold.ipl_players_matchs (
  match_id INT,
  season INT,

  player_sk BIGINT,
  player_name STRING,
  team_sk BIGINT,

  runs_scored INT,
  balls_faced INT,
  fours INT,
  sixes INT,

  overs_bowled DOUBLE,
  runs_conceded INT,
  wickets INT,

  catches INT,
  stumpings INT,
  run_outs INT,

  player_of_match BOOLEAN,

  ingestion_ts TIMESTAMP
)
USING DELTA
TBLPROPERTIES (
  delta.autoOptimize.optimizeWrite = true,
  delta.autoOptimize.autoCompact = true
);

In [None]:
from pyspark.sql import functions as F

# Read silver tables
fact_deliveries = spark.table("silver.fact_deliveries")
fact_matches = spark.table("silver.fact_matches")
dim_player = spark.table("silver.dim_player")

In [None]:
# Step 1: Determine each player's team per match
player_roles = (
    fact_deliveries.select("match_id", F.col("batsman_sk").alias("player_sk"), F.col("batting_team_sk").alias("team_sk"))
    .filter(F.col("player_sk").isNotNull())
    .union(
        fact_deliveries.select("match_id", F.col("non_striker_sk").alias("player_sk"), F.col("batting_team_sk").alias("team_sk"))
        .filter(F.col("player_sk").isNotNull())
    )
    .union(
        fact_deliveries.select("match_id", F.col("bowler_sk").alias("player_sk"), F.col("bowling_team_sk").alias("team_sk"))
        .filter(F.col("player_sk").isNotNull())
    )
    .union(
        fact_deliveries.filter(F.col("fielder_sk").isNotNull())
        .select("match_id", F.col("fielder_sk").alias("player_sk"), F.col("bowling_team_sk").alias("team_sk"))
    )
    .union(
        fact_deliveries.filter(F.col("player_dismissed_sk").isNotNull())
        .select("match_id", F.col("player_dismissed_sk").alias("player_sk"), F.col("batting_team_sk").alias("team_sk"))
    )
)

player_team = player_roles.groupBy("match_id", "player_sk").agg(
    F.first("team_sk").alias("team_sk")
)

In [None]:
# Step 2: Batting statistics
batting_stats = fact_deliveries.groupBy("match_id", F.col("batsman_sk").alias("player_sk")).agg(
    F.sum("batsman_runs").alias("runs_scored_raw"),
    F.sum(F.when((F.col("wide_runs") == 0) & (F.col("noball_runs") == 0), 1).otherwise(0)).alias("balls_faced_raw"),
    F.sum(F.when(F.col("batsman_runs") == 4, 1).otherwise(0)).alias("fours_raw"),
    F.sum(F.when(F.col("batsman_runs") == 6, 1).otherwise(0)).alias("sixes_raw")
)

In [None]:
# Step 3: Bowling statistics
bowling_stats = fact_deliveries.groupBy("match_id", F.col("bowler_sk").alias("player_sk")).agg(
    F.sum("total_runs").alias("runs_conceded_raw"),
    (F.sum(F.when((F.col("wide_runs") == 0) & (F.col("noball_runs") == 0), 1).otherwise(0)) / 6.0).alias("overs_bowled"),
    F.sum(
        F.when(
            F.col("player_dismissed_sk").isNotNull() &
            ~F.col("dismissal_kind").isin("run out", "retired hurt", "retired out", "obstructing the field"),
            1
        ).otherwise(0)
    ).alias("wickets_raw")
)

In [None]:
# Step 4: Fielding statistics
fielding_stats = fact_deliveries.filter(
    F.col("player_dismissed_sk").isNotNull() & F.col("fielder_sk").isNotNull()
).groupBy("match_id", F.col("fielder_sk").alias("player_sk")).agg(
    F.sum(F.when(F.col("dismissal_kind") == "caught", 1).otherwise(0)).alias("catches_raw"),
    F.sum(F.when(F.col("dismissal_kind") == "stumped", 1).otherwise(0)).alias("stumpings_raw"),
    F.sum(F.when(F.col("dismissal_kind") == "run out", 1).otherwise(0)).alias("run_outs_raw")
)

In [None]:
# Step 5: Player of the Match
pom_stats = fact_matches.filter(F.col("player_of_match_sk").isNotNull()).select(
    "match_id",
    F.col("player_of_match_sk").alias("player_sk"),
    F.lit(True).alias("player_of_match")
)

In [None]:
# Step 6: Assemble final DataFrame with explicit INT casts
base = player_team.join(
    fact_matches.select("match_id", "season"),
    "match_id"
)

gold_df = base \
    .join(dim_player.select("player_sk", "player_name"), "player_sk") \
    .join(batting_stats, ["match_id", "player_sk"], "left") \
    .join(bowling_stats, ["match_id", "player_sk"], "left") \
    .join(fielding_stats, ["match_id", "player_sk"], "left") \
    .join(pom_stats, ["match_id", "player_sk"], "left") \
    .select(
        "match_id",
        "season",
        "player_sk",
        "player_name",
        "team_sk",
        # Batting - cast to INT
        F.coalesce(F.col("runs_scored_raw"), F.lit(0)).cast("int").alias("runs_scored"),
        F.coalesce(F.col("balls_faced_raw"), F.lit(0)).cast("int").alias("balls_faced"),
        F.coalesce(F.col("fours_raw"), F.lit(0)).cast("int").alias("fours"),
        F.coalesce(F.col("sixes_raw"), F.lit(0)).cast("int").alias("sixes"),
        # Bowling
        F.coalesce(F.col("overs_bowled"), F.lit(0.0)).alias("overs_bowled"),
        F.coalesce(F.col("runs_conceded_raw"), F.lit(0)).cast("int").alias("runs_conceded"),
        F.coalesce(F.col("wickets_raw"), F.lit(0)).cast("int").alias("wickets"),
        # Fielding - cast to INT
        F.coalesce(F.col("catches_raw"), F.lit(0)).cast("int").alias("catches"),
        F.coalesce(F.col("stumpings_raw"), F.lit(0)).cast("int").alias("stumpings"),
        F.coalesce(F.col("run_outs_raw"), F.lit(0)).cast("int").alias("run_outs"),
        # POM
        F.coalesce(F.col("player_of_match"), F.lit(False)).alias("player_of_match"),
        F.current_timestamp().alias("ingestion_ts")
    )

In [None]:
# Write to gold table
(
    gold_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("gold.ipl_players_matchs")
)

In [None]:
# Optimize
spark.sql("OPTIMIZE gold.ipl_players_matchs ZORDER BY (season, player_sk)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,