In [1]:
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler}
import org.apache.spark.ml.regression.GeneralizedLinearRegression
import org.apache.spark.sql.expressions.Window

//create spark session
val spark = SparkSession.builder.appName("SPL Analysis").getOrCreate()
import spark.implicits._

spark = org.apache.spark.sql.SparkSession@68a03605


org.apache.spark.sql.SparkSession@68a03605

In [2]:
//import transformed data
val df = spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("data/data_for_model.csv")

df = [season_id: int, season_label: string ... 8 more fields]


[season_id: int, season_label: string ... 8 more fields]

In [3]:
//to partquet for editing
df.write
  .mode("overwrite")
  .parquet("data/model_master")

In [4]:
//current league table
//include goals for, goals against
val currentTable = Seq(
  ("Lion City Sailors", 15, 22, 2),
  ("Geylang Int.", 10, 7, 4),
  ("Tampines Rovers", 8, 8, 4),
  ("Balestier Khalsa", 8, 9, 12),
  ("Albirex Niigata", 7, 8, 5),
  ("Hougang Utd", 3, 4, 10),
  ("Tanjong Pagar Utd.", 3, 7, 15),
  ("Young Lions", 0, 4, 17)
).toDF("team", "pts", "gf", "ga")
currentTable.show(truncate = false)

currentTable = [team: string, pts: int ... 2 more fields]


+------------------+---+---+---+
|team              |pts|gf |ga |
+------------------+---+---+---+
|Lion City Sailors |15 |22 |2  |
|Geylang Int.      |10 |7  |4  |
|Tampines Rovers   |8  |8  |4  |
|Balestier Khalsa  |8  |9  |12 |
|Albirex Niigata   |7  |8  |5  |
|Hougang Utd       |3  |4  |10 |
|Tanjong Pagar Utd.|3  |7  |15 |
|Young Lions       |0  |4  |17 |
+------------------+---+---+---+



[team: string, pts: int ... 2 more fields]

In [5]:
//stream to update current table
// NOTE: Avoid case class encoders in notebooks/REPL (can cause "Unable to generate an encoder for inner class...").
// Use an explicit StructType schema instead.
import org.apache.spark.sql.types._

val eventSchema = StructType(Seq(
  StructField("event_id", LongType, nullable = true),
  StructField("season_year", StringType, nullable = true),
  StructField("home_team", StringType, nullable = true),
  StructField("away_team", StringType, nullable = true),
  StructField("home_score", LongType, nullable = true),
  StructField("away_score", LongType, nullable = true),
  StructField("status_code", LongType, nullable = true),
  StructField("start_timestamp", LongType, nullable = true)
))

eventSchema = StructType(StructField(event_id,LongType,true),StructField(season_year,StringType,true),StructField(home_team,StringType,true),StructField(away_team,StringType,true),StructField(home_score,LongType,true),StructField(away_score,LongType,true),StructField(status_code,LongType,true),StructField(start_timestamp,LongType,true))


StructType(StructField(event_id,LongType,true),StructField(season_year,StringType,true),StructField(home_team,StringType,true),StructField(away_team,StringType,true),StructField(home_score,LongType,true),StructField(away_score,LongType,true),StructField(status_code,LongType,true),StructField(start_timestamp,LongType,true))

In [6]:
val eventStream = spark.readStream
  .schema(eventSchema)
  .option("multiLine", "true")
  .json("data/stream_in/event")
  .withColumn("source_file", input_file_name())

eventStream = [event_id: bigint, season_year: string ... 7 more fields]


[event_id: bigint, season_year: string ... 7 more fields]

In [7]:
//one match will affect 2 teams on the table
val perMatchTeamDelta =
  eventStream
    // Guard against wrong JSON being dropped into the event folder (e.g., stats JSON).
    .filter(
      col("status_code") === 100 &&
      col("home_team").isNotNull && col("away_team").isNotNull &&
      col("home_score").isNotNull && col("away_score").isNotNull
    )
//get pts for home team
    .withColumn(
      "home_pts",
      when(col("home_score") > col("away_score"), lit(3))
        .when(col("home_score") === col("away_score"), lit(1))
        .otherwise(lit(0))
    )
//get pts for away team
    .withColumn(
      "away_pts",
      when(col("away_score") > col("home_score"), lit(3))
        .when(col("away_score") === col("home_score"), lit(1))
        .otherwise(lit(0))
    )
    .select(
      col("event_id"),
      col("source_file"),
      array(
        struct(
          col("home_team").alias("team"),
          col("home_pts").alias("pts_delta"),
          col("home_score").alias("gf_delta"),
          col("away_score").alias("ga_delta")
        ),
        struct(
          col("away_team").alias("team"),
          col("away_pts").alias("pts_delta"),
          col("away_score").alias("gf_delta"),
          col("home_score").alias("ga_delta")
        )
      ).alias("rows")
    )
    .withColumn("row", explode(col("rows")))
    .select(
      col("event_id"),
      col("source_file"),
      col("row.team").alias("team"),
      col("row.pts_delta").alias("pts_delta"),
      col("row.gf_delta").alias("gf_delta"),
      col("row.ga_delta").alias("ga_delta")
    )

perMatchTeamDelta = [event_id: bigint, source_file: string ... 4 more fields]


[event_id: bigint, source_file: string ... 4 more fields]

In [8]:
// Aggregate changes (batch-level).
// NOTE: We do this inside foreachBatch so we don't need streaming state/checkpoints.
def aggregateDelta(perMatchTeamDeltaBatch: DataFrame): DataFrame = {
  perMatchTeamDeltaBatch
    .groupBy(col("team"))
    .agg(
      sum(col("pts_delta")).alias("pts_add"),
      sum(col("gf_delta")).alias("gf_add"),
      sum(col("ga_delta")).alias("ga_add")
    )
}

aggregateDelta: (perMatchTeamDeltaBatch: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [9]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

// NO CHECKPOINTING (demo mode): do all aggregation/update per micro-batch.
val q = perMatchTeamDelta.writeStream
  .outputMode("append")
  .foreachBatch { (batchRows: DataFrame, batchId: Long) =>

    println(s"[EVENT] batchId=$batchId | inputRows=${batchRows.count()}")
    println("[EVENT] files:")
    batchRows.select("source_file").distinct().show(50, truncate = false)

    // If someone drops stats JSON into event folder, it will be filtered out in perMatchTeamDelta.
    val batchDelta = aggregateDelta(batchRows)
    println(s"[EVENT] deltaTeams=${batchDelta.count()}")

    val baseTable =
      if (spark.catalog.tableExists("current_table_live")) spark.table("current_table_live")
      else currentTable

    val updated =
      baseTable.as("c")
        .join(batchDelta.as("d"), Seq("team"), "left")
        .select(
          col("team"),
          (col("c.pts") + coalesce(col("d.pts_add"), lit(0))).alias("pts"),
          (col("c.gf")  + coalesce(col("d.gf_add"),  lit(0))).alias("gf"),
          (col("c.ga")  + coalesce(col("d.ga_add"),  lit(0))).alias("ga")
        )
        .orderBy(desc("pts"), (col("gf") - col("ga")).desc, desc("gf"))

    updated.createOrReplaceTempView("current_table_live")
    println(s"=== batchId=$batchId ===")
    updated.show(50, truncate = false)
  }
  .start()

q = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1bd92154


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1bd92154

[EVENT] batchId=0 | inputRows=4
[EVENT] files:
+------------------------------------------------------------------------------+
|source_file                                                                   |
+------------------------------------------------------------------------------+
|file:///workspace/PowerRanking/data/stream_in/event/event_14195502%20copy.json|
|file:///workspace/PowerRanking/data/stream_in/event/event_14195502.json       |
+------------------------------------------------------------------------------+

[EVENT] deltaTeams=2
=== batchId=0 ===
+------------------+---+---+---+
|team              |pts|gf |ga |
+------------------+---+---+---+
|Lion City Sailors |15 |22 |2  |
|Balestier Khalsa  |14 |13 |14 |
|Geylang Int.      |10 |7  |4  |
|Tampines Rovers   |8  |8  |4  |
|Albirex Niigata   |7  |8  |5  |
|Hougang Utd       |3  |4  |10 |
|Tanjong Pagar Utd.|3  |9  |19 |
|Young Lions       |0  |4  |17 |
+------------------+---+---+---+



In [11]:
//stats streaming
// NOTE: Avoid case class encoders in notebooks/REPL; use explicit schema.
val statsSchema = StructType(Seq(
  StructField("event_id", LongType, nullable = true),
  StructField("home_xg", DoubleType, nullable = true),
  StructField("away_xg", DoubleType, nullable = true)
))

val statsStream = spark.readStream
  .schema(statsSchema)
  .option("multiLine", "true")
  .option("ignoreMissingFiles", "true")
  .option("ignoreCorruptFiles", "true")
  .json("data/stream_in/stats")
  .withColumn("source_file", input_file_name())

statsSchema = StructType(StructField(event_id,LongType,true),StructField(home_xg,DoubleType,true),StructField(away_xg,DoubleType,true))
statsStream = [event_id: bigint, home_xg: double ... 2 more fields]


[event_id: bigint, home_xg: double ... 2 more fields]

In [8]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.hadoop.fs.{FileSystem, Path}

def upsertStatsIntoMaster(statsBatch: DataFrame): Unit = {

  val masterPath = "data/model_master"

  // Initialize master parquet if it doesn't exist yet (seed from the base CSV df).
  // In this project we use a parquet folder at `data/model_master` as the streaming-updated master.
  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val p = new Path(masterPath)

  val exists = fs.exists(p)
  val empty = exists && {
    val st = fs.listStatus(p)
    st == null || st.isEmpty
  }

  if (!exists || empty) {
    if (empty) {
      fs.delete(p, true)
      println(s"[INIT] $masterPath existed but was empty -> recreating from df")
    } else {
      println(s"[INIT] $masterPath not found -> creating from df (data_for_model.csv)")
    }
    df.write.mode("overwrite").parquet(masterPath)
  }

  // Rename incoming columns to avoid duplicate column names after join.
  // We only want to fill nulls in the master (not overwrite existing xG).
  val updates = statsBatch
    .select(
      col("event_id"),
      col("home_xg").cast("double").alias("u_home_expected_goals"),
      col("away_xg").cast("double").alias("u_away_expected_goals")
    )
    .dropDuplicates("event_id")

  val master = spark.read.parquet(masterPath)

  // Debug: verify whether incoming event_ids actually match master.
  val incomingUnique = updates.select("event_id").count()
  val matchedInMaster =
    master.select("event_id").dropDuplicates()
      .join(updates.select("event_id"), Seq("event_id"), "inner")
      .count()
  val labeledBefore = master
    .filter(col("home_expected_goals").isNotNull && col("away_expected_goals").isNotNull)
    .count()

  println(s"[DATA] stats incoming unique=$incomingUnique | matched_in_master=$matchedInMaster | unmatched=${incomingUnique - matchedInMaster} | labeled_rows_before=$labeledBefore")

  val updated =
    master.as("m")
      .join(updates.as("u"), Seq("event_id"), "left")
      .withColumn(
        "home_expected_goals",
        coalesce(col("m.home_expected_goals"), col("u.u_home_expected_goals"))
      )
      .withColumn(
        "away_expected_goals",
        coalesce(col("m.away_expected_goals"), col("u.u_away_expected_goals"))
      )
      .withColumn(
        "is_future_fixture",
        when(col("u.event_id").isNotNull, lit(0)) // stats arrived â†’ no longer future
          .otherwise(col("m.is_future_fixture"))
      )
      .drop("u_home_expected_goals", "u_away_expected_goals")

  updated.write.mode("overwrite").parquet(masterPath)
}


upsertStatsIntoMaster: (statsBatch: org.apache.spark.sql.DataFrame)Unit


In [9]:
//pipeline refit function for home + away GLM
def refitHomeAwayPipelines(trainRaw: DataFrame): (PipelineModel, PipelineModel) = {
  // IMPORTANT: GLM label columns are home_xg / away_xg
  // but the master parquet stores them as home_expected_goals / away_expected_goals.
  val train = trainRaw.select(
    col("home_team"),
    col("away_team"),
    col("home_expected_goals").cast("double").alias("home_xg"),
    col("away_expected_goals").cast("double").alias("away_xg"),
    lit(1.0).alias("home_adv")
  )

  val home_idx = new StringIndexer()
    .setInputCol("home_team")
    .setOutputCol("home_team_idx")
    .setHandleInvalid("keep")

  val away_idx = new StringIndexer()
    .setInputCol("away_team")
    .setOutputCol("away_team_idx")
    .setHandleInvalid("keep")

  val enc = new OneHotEncoder()
    .setInputCols(Array("home_team_idx", "away_team_idx"))
    .setOutputCols(Array("home_team_ohe", "away_team_ohe"))

  val assembler = new VectorAssembler()
    .setInputCols(Array("home_team_ohe", "away_team_ohe", "home_adv"))
    .setOutputCol("features")

  val glm_home = new GeneralizedLinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("home_xg")
    .setFamily("gaussian")
    .setLink("identity")
    .setMaxIter(50)
    .setRegParam(0.0)

  val glm_away = new GeneralizedLinearRegression()
    .setFeaturesCol("features")
    .setLabelCol("away_xg")
    .setFamily("gaussian")
    .setLink("identity")
    .setMaxIter(50)
    .setRegParam(0.0)

  val pipe_home = new Pipeline().setStages(Array(home_idx, away_idx, enc, assembler, glm_home))
  val pipe_away = new Pipeline().setStages(Array(home_idx, away_idx, enc, assembler, glm_away))

  val model_home = pipe_home.fit(train)
  val model_away = pipe_away.fit(train)

  (model_home, model_away)
}

// -----------------------------
// Pattern B helpers: load-or-train + refresh exports
// -----------------------------
// (PipelineModel is imported in Cell 0)
import org.apache.spark.ml.feature.{StringIndexerModel, OneHotEncoderModel}
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel

val MASTER_PATH = "data/model_master"
val MODEL_HOME_PATH = "data/models/glm_home_pipeline_latest"
val MODEL_AWAY_PATH = "data/models/glm_away_pipeline_latest"
val DASH_OUT = "data/dashboard_table_csv"
val FIXT_OUT = "data/match_level_fixtures"

// Keep these small-ish for "live" refresh. You can bump them if runtime is OK.
val DEFAULT_MC_SIMS = 20000

@volatile var dashboardBootstrapped: Boolean = false

import org.apache.hadoop.fs.{FileSystem, Path}

def ensureMasterParquet(): Unit = {
  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val p = new Path(MASTER_PATH)
  if (!fs.exists(p)) {
    println(s"[INIT] $MASTER_PATH not found -> creating from df (data_for_model.csv)")
    df.write.mode("overwrite").parquet(MASTER_PATH)
  }
}

def tryLoadPipeline(path: String): Option[PipelineModel] = {
  try Some(PipelineModel.load(path))
  catch { case _: Throwable => None }
}

def loadOrTrainPipelines(): (PipelineModel, PipelineModel) = {
  val mh = tryLoadPipeline(MODEL_HOME_PATH)
  val ma = tryLoadPipeline(MODEL_AWAY_PATH)
  if (mh.isDefined && ma.isDefined) return (mh.get, ma.get)

  ensureMasterParquet()
  val master = spark.read.parquet(MASTER_PATH)
  val modelDF = master
    .filter(col("home_expected_goals").isNotNull && col("away_expected_goals").isNotNull)
    .dropDuplicates("event_id")

  val (h, a) = refitHomeAwayPipelines(modelDF)
  h.write.overwrite().save(MODEL_HOME_PATH)
  a.write.overwrite().save(MODEL_AWAY_PATH)
  (h, a)
}

def getLiveTableSafe(): DataFrame = {
  if (spark.catalog.tableExists("current_table_live")) spark.table("current_table_live")
  else currentTable
}

def powerRankFromHomePipeline(modelHome: PipelineModel): DataFrame = {
  // Avoid Seq(...).toDF + zip/:+ patterns (Scala REPL can get unstable after re-runs).
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.types._

  val stages = modelHome.stages
  val homeIdx = stages(0).asInstanceOf[StringIndexerModel]
  val awayIdx = stages(1).asInstanceOf[StringIndexerModel]
  val ohe = stages(2).asInstanceOf[OneHotEncoderModel]
  val glm = stages(4).asInstanceOf[GeneralizedLinearRegressionModel]

  val coeff = glm.coefficients.toArray
  val homeCats = ohe.categorySizes(0)
  val awayCats = ohe.categorySizes(1)
  val dropLast = ohe.getDropLast
  val homeDim = if (dropLast) homeCats - 1 else homeCats
  val awayDim = if (dropLast) awayCats - 1 else awayCats

  val homeAttackCoef = coeff.slice(0, homeDim)
  val awayDefenseCoef = coeff.slice(homeDim, homeDim + awayDim)

  val homeTeams = if (homeIdx.getHandleInvalid == "keep") homeIdx.labels :+ "UNKNOWN" else homeIdx.labels
  val awayTeams = if (awayIdx.getHandleInvalid == "keep") awayIdx.labels :+ "UNKNOWN" else awayIdx.labels

  def attachBaselineRows(teams: Array[String], coef: Array[Double]): Seq[Row] = {
    val out = new Array[Row](teams.length)
    var i = 0
    while (i < teams.length) {
      val v = if (i < coef.length) coef(i) else 0.0
      out(i) = Row(teams(i), v)
      i += 1
    }
    out.toSeq
  }

  def pairDF(rows: Seq[Row], valueCol: String): DataFrame = {
    val schema = StructType(Seq(
      StructField("team", StringType, nullable = false),
      StructField(valueCol, DoubleType, nullable = false)
    ))
    spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
  }

  val homeAttack = pairDF(attachBaselineRows(homeTeams, homeAttackCoef), "home_attack")
  val awayDefWeak = pairDF(attachBaselineRows(awayTeams, awayDefenseCoef), "away_def_weak")

  homeAttack
    .join(awayDefWeak, "team")
    .withColumn("power_score", col("home_attack") - col("away_def_weak"))
    .orderBy(desc("power_score"))
}

// Full refresh: score fixtures -> compute odds/MC/SPI -> export CSVs for Streamlit
// This is what makes Pattern B non-redundant: predictions always come from the latest saved pipeline models.
def refreshDashboard(seasonLabel: String = "25/26", mcSims: Int = DEFAULT_MC_SIMS): Unit = {
  // Avoid relying on spark.implicits / .toDF in notebooks (REPL can get unstable after re-runs).
  import org.apache.spark.sql.Row
  import org.apache.spark.sql.types._

  val liveTable = getLiveTableSafe().cache()

  val (modelHome, modelAway) = loadOrTrainPipelines()
  ensureMasterParquet()
  val master = spark.read.parquet(MASTER_PATH)

  // 1) Score future fixtures
  val fixtures = master
    .filter(col("is_future_fixture") === 1 && col("season_label") === seasonLabel)
    .select(
      col("event_id"),
      col("match_date"),
      trim(col("home_team")).alias("home_team"),
      trim(col("away_team")).alias("away_team"),
      lit(1.0).alias("home_adv")
    )
    .dropDuplicates("event_id")

  // IMPORTANT: modelHome and modelAway are both Pipelines that create the same intermediate columns
  // (home_team_idx, away_team_idx, *ohe, features). If we feed the full output of modelHome into modelAway,
  // the second transform fails with: "Output column home_team_idx already exists".
  val predHomeFull = modelHome.transform(fixtures)
    .withColumnRenamed("prediction", "lambda_home")

  // Drop intermediate pipeline columns by selecting only what modelAway needs.
  val predHome = predHomeFull.select(
    col("event_id"),
    col("match_date"),
    col("home_team"),
    col("away_team"),
    col("home_adv"),
    col("lambda_home")
  )

  val predBothFull = modelAway.transform(predHome)
    .withColumnRenamed("prediction", "lambda_away")

  val predBoth = predBothFull.select(
    col("event_id"),
    col("match_date"),
    col("home_team"),
    col("away_team"),
    col("lambda_home"),
    col("lambda_away")
  )

  // 2) W/D/L probabilities (local UDF to avoid global symbol clashes)
  import scala.math.{exp, pow}

  def poissonPmf(k: Int, lambda: Double): Double = {
    if (k < 0) 0.0
    else {
      var fact = 1.0
      var i = 2
      while (i <= k) { fact *= i; i += 1 }
      exp(-lambda) * pow(lambda, k) / fact
    }
  }

  case class WDL(win: Double, draw: Double, loss: Double)

  def wdlFromLambdas(lhRaw: Double, laRaw: Double, maxGoals: Int = 10): WDL = {
    val lh = math.max(lhRaw, 0.0)
    val la = math.max(laRaw, 0.0)

    val ph = Array.tabulate(maxGoals + 1)(k => poissonPmf(k, lh))
    val pa = Array.tabulate(maxGoals + 1)(k => poissonPmf(k, la))

    var mass = 0.0
    var i = 0
    while (i <= maxGoals) {
      var j = 0
      while (j <= maxGoals) { mass += ph(i) * pa(j); j += 1 }
      i += 1
    }
    val denom = if (mass > 0) mass else 1.0

    var win = 0.0
    var draw = 0.0
    i = 0
    while (i <= maxGoals) {
      var j = 0
      while (j <= maxGoals) {
        val p = (ph(i) * pa(j)) / denom
        if (i > j) win += p
        else if (i == j) draw += p
        j += 1
      }
      i += 1
    }
    WDL(win, draw, 1.0 - win - draw)
  }

  val wdlUdfLocal = udf((lh: Double, la: Double) => {
    val r = wdlFromLambdas(lh, la, maxGoals = 10)
    (r.win, r.draw, r.loss)
  })

  val matchProbs =
    predBoth
      .withColumn("wdl", wdlUdfLocal(col("lambda_home"), col("lambda_away")))
      .withColumn("p_home_win", col("wdl").getField("_1"))
      .withColumn("p_draw",     col("wdl").getField("_2"))
      .withColumn("p_away_win", col("wdl").getField("_3"))
      .drop("wdl")
      .withColumn("exp_pts_home", lit(3.0) * col("p_home_win") + lit(1.0) * col("p_draw"))
      .withColumn("exp_pts_away", lit(3.0) * col("p_away_win") + lit(1.0) * col("p_draw"))
      .withColumnRenamed("lambda_home", "xg_home")
      .withColumnRenamed("lambda_away", "xg_away")

  // 3) Monte Carlo season sims (using predicted lambdas)
  import scala.util.Random

  case class TeamState(var pts: Int, var gf: Int, var ga: Int)

  def poisson(lambdaRaw: Double, rng: Random): Int = {
    val lambda = math.max(lambdaRaw, 0.0)
    val L = math.exp(-lambda)
    var k = 0
    var p = 1.0
    do {
      k += 1
      p *= rng.nextDouble()
    } while (p > L && k < 25)
    k - 1
  }

  def goalDiff(s: TeamState): Int = s.gf - s.ga

  def simulateSeasonOnce(
    teams: Array[String],
    base: Map[String, TeamState],
    matches: Array[MatchSim],
    seed: Int
  ): Map[String, TeamState] = {

    val rng = new Random(seed)

    val st = scala.collection.mutable.Map[String, TeamState]()
    teams.foreach { t =>
      val b = base(t)
      st(t) = TeamState(b.pts, b.gf, b.ga)
    }

    matches.foreach { m =>
      val gh = poisson(m.lamH, rng)
      val ga = poisson(m.lamA, rng)

      st(m.h).gf += gh; st(m.h).ga += ga
      st(m.a).gf += ga; st(m.a).ga += gh

      if (gh > ga) st(m.h).pts += 3
      else if (gh < ga) st(m.a).pts += 3
      else { st(m.h).pts += 1; st(m.a).pts += 1 }
    }

    st.toMap
  }

  val teams: Array[String] =
    liveTable.select("team").collect().map(r => r.getString(0))

  val baseTable: Map[String, TeamState] =
    liveTable.select("team", "pts", "gf", "ga").collect()
      .map { r =>
        val t = r.getString(0)
        val p = r.getAs[Number](1).longValue()
        val gf = r.getAs[Number](2).longValue()
        val ga = r.getAs[Number](3).longValue()
        t -> TeamState(p.toInt, gf.toInt, ga.toInt)
      }
      .toMap

  final case class MatchSim(h: String, a: String, lamH: Double, lamA: Double)

  val matchesArr: Array[MatchSim] =
    predBoth.select("home_team", "away_team", "lambda_home", "lambda_away")
      .collect()
      .map { r =>
        MatchSim(
          r.getString(0),
          r.getString(1),
          r.getAs[Number](2).doubleValue(),
          r.getAs[Number](3).doubleValue()
        )
      }

  val winLeague = scala.collection.mutable.Map[String, Int]().withDefaultValue(0)
  val makeACL   = scala.collection.mutable.Map[String, Int]().withDefaultValue(0)
  val totalFinalPts = scala.collection.mutable.Map[String, Double]().withDefaultValue(0.0)

  val N = math.max(mcSims, 1)

  for (i <- 0 until N) {
    val end = simulateSeasonOnce(teams, baseTable, matchesArr, seed = 1234 + i)

    val ranked = teams.sortBy { t =>
      val s = end(t)
      (-s.pts, -goalDiff(s), -s.gf, t)
    }

    winLeague(ranked(0)) += 1
    ranked.take(2).foreach(t => makeACL(t) += 1)
    teams.foreach { t => totalFinalPts(t) += end(t).pts }
  }

  val mcPtsRows = teams.toSeq.map { t =>
    val expFinal = totalFinalPts(t) / N
    Row(t, expFinal)
  }
  val mcPtsSchema = StructType(Seq(
    StructField("team", StringType, nullable = false),
    StructField("exp_pts_final_mc", DoubleType, nullable = false)
  ))
  val mcPtsDF = spark.createDataFrame(spark.sparkContext.parallelize(mcPtsRows), mcPtsSchema)

  val mcRemainingDF =
    mcPtsDF
      .join(liveTable.select($"team", $"pts"), Seq("team"))
      .withColumn("exp_pts_remaining_mc", col("exp_pts_final_mc") - col("pts"))

  val probsRows = teams.toSeq.map { t =>
    val pWin = winLeague(t).toDouble / N
    val pACL = makeACL(t).toDouble / N
    Row(t, pWin * 100.0, pACL * 100.0)
  }
  val probsSchema = StructType(Seq(
    StructField("team", StringType, nullable = false),
    StructField("win_league_pct", DoubleType, nullable = false),
    StructField("make_acl_pct", DoubleType, nullable = false)
  ))
  val probsDF = spark.createDataFrame(spark.sparkContext.parallelize(probsRows), probsSchema)

  // 4) SPI from power rank
  val powerRank = powerRankFromHomePipeline(modelHome)
  val w = Window.partitionBy()
  val spiDF =
    powerRank
      .withColumn("mu", avg($"power_score").over(w))
      .withColumn("sd", stddev($"power_score").over(w))
      .withColumn("spi", lit(75.0) + lit(10.0) * (($"power_score" - $"mu") / $"sd"))
      .select("team", "spi")

  // Debug views (so you can show() them in separate cells)
  powerRank.createOrReplaceTempView("power_rank_live")
  spiDF.createOrReplaceTempView("spi_live")

  // 5) Dashboard table export
  val core =
    liveTable
      .select(
        col("team"),
        col("pts").cast("double").alias("pts"),
        col("gf").cast("int").alias("gf"),
        col("ga").cast("int").alias("ga")
      )

  val finalTable =
    core
      .join(mcRemainingDF.select("team", "exp_pts_remaining_mc"), Seq("team"), "left")
      .join(mcPtsDF.select("team", "exp_pts_final_mc"), Seq("team"), "left")
      .join(probsDF.select("team", "win_league_pct", "make_acl_pct"), Seq("team"), "left")
      .join(spiDF.select("team", "spi"), Seq("team"), "left")
      .na.fill(0.0, Seq("exp_pts_remaining_mc", "exp_pts_final_mc", "win_league_pct", "make_acl_pct", "spi"))
      .orderBy(desc("exp_pts_final_mc"))

  // Debug view
  finalTable.createOrReplaceTempView("dashboard_live")

  finalTable
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(DASH_OUT)

  // 6) Match-level fixtures export (team/opponent rows)
  val homeView =
    matchProbs.select(
      col("event_id"),
      col("match_date"),
      col("home_team").alias("team"),
      col("away_team").alias("opponent"),
      lit("H").alias("venue"),
      col("xg_home").alias("xg_for"),
      col("xg_away").alias("xg_against"),
      col("p_home_win").alias("p_win"),
      col("p_draw").alias("p_draw"),
      col("p_away_win").alias("p_loss"),
      col("exp_pts_home").alias("exp_pts")
    )

  val awayView =
    matchProbs.select(
      col("event_id"),
      col("match_date"),
      col("away_team").alias("team"),
      col("home_team").alias("opponent"),
      lit("A").alias("venue"),
      col("xg_away").alias("xg_for"),
      col("xg_home").alias("xg_against"),
      col("p_away_win").alias("p_win"),
      col("p_draw").alias("p_draw"),
      col("p_home_win").alias("p_loss"),
      col("exp_pts_away").alias("exp_pts")
    )

  val teamFixtures =
    homeView
      .unionByName(awayView)
      .withColumn("p_win", round(col("p_win") * 100, 2))
      .withColumn("p_draw", round(col("p_draw") * 100, 2))
      .withColumn("p_loss", round(col("p_loss") * 100, 2))
      .withColumn("xg_for", round(col("xg_for"), 2))
      .withColumn("xg_against", round(col("xg_against"), 2))
      .withColumn("exp_pts", round(col("exp_pts"), 2))

  // Debug view
  teamFixtures.createOrReplaceTempView("fixtures_live")

  teamFixtures
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(FIXT_OUT)

  dashboardBootstrapped = true
  liveTable.unpersist()
}

// -----------------------------
// Offline bootstrap: always produce outputs at least once from df/master.
// -----------------------------
// This ensures you get dashboard + fixtures CSVs even if:
// - no streaming files arrive yet, or
// - only the event stream is running.
val AUTO_BOOTSTRAP_DASHBOARD = true

if (AUTO_BOOTSTRAP_DASHBOARD && !dashboardBootstrapped) {
  println("[AUTO] bootstrapping dashboard from df/master -> refreshDashboard()")
  ensureMasterParquet()
  refreshDashboard(seasonLabel = "25/26", mcSims = DEFAULT_MC_SIMS)

  // Print the key views so the initial (no-stream) run is visibly doing work.
  if (spark.catalog.tableExists("dashboard_live")) {
    println("[AUTO] dashboard_live (top 10)")
    spark.table("dashboard_live").show(10, truncate = false)
  }
  if (spark.catalog.tableExists("spi_live")) {
    println("[AUTO] spi_live (top 10)")
    spark.table("spi_live").orderBy(desc("spi")).show(10, truncate = false)
  }
  if (spark.catalog.tableExists("fixtures_live")) {
    println("[AUTO] fixtures_live (top 10)")
    spark.table("fixtures_live").orderBy("match_date").show(10, truncate = false)
  }

  println("[AUTO] bootstrap done")
}


[AUTO] bootstrapping dashboard from df/master -> refreshDashboard()
[AUTO] dashboard_live (top 10)
+------------------+----+---+---+--------------------+----------------+--------------+------------+------------------+
|team              |pts |gf |ga |exp_pts_remaining_mc|exp_pts_final_mc|win_league_pct|make_acl_pct|spi               |
+------------------+----+---+---+--------------------+----------------+--------------+------------+------------------+
|Lion City Sailors |15.0|22 |2  |37.7883             |52.7883         |96.64         |99.89       |89.47321462247646 |
|Tampines Rovers   |8.0 |8  |4  |31.90965            |39.90965        |3.145         |76.33       |88.7796109400253  |
|Albirex Niigata   |7.0 |8  |5  |27.139200000000002  |34.1392         |0.2           |20.21       |82.79783898992036 |
|Geylang Int.      |10.0|7  |4  |17.8014             |27.8014         |0.005         |2.045       |74.22054475013204 |
|Balestier Khalsa  |8.0 |9  |12 |18.26305            |26.26305      

MASTER_PATH = data/model_master
MODEL_HOME_PATH = data/models/glm_home_pipeline_latest
MODEL_AWAY_PATH = data/models/glm_away_pipeline_latest
DASH_OUT = data/dashboard_table_csv
FIXT_OUT = data/match_level_fixtures
DEFAULT_MC_SIMS = 20000
dashboardBootstrapped = true


refitHomeAwayPipelines: (trainRaw: org.apache.spark.sql.DataFrame)(org.apache.spark.ml.PipelineModel, org.apache.spark.ml.PipelineModel)
ensureMasterParquet: ()Unit
tryLoadPipeline: (path: String)Option[org.apache.spark.ml.PipelineModel]
loadOrTrainPipelines: ()(org.apache.spark.m...


true

In [12]:
import org.apache.spark.sql.streaming.Trigger

@volatile var statsAccum: Long = 0L

val mlQ = statsStream.writeStream
  .outputMode("append")
  // Poll for new files every 2s so "drag in JSON" demos reliably show new batches.
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .foreachBatch { (statsBatch: DataFrame, batchId: Long) =>

    val inputRows = statsBatch.count()
    println(s"[STATS] batchId=$batchId | inputRows=$inputRows")

    if (inputRows == 0) {
      // keep-alive tick
    } else {
      if (statsBatch.columns.contains("source_file")) {
        println("[STATS] files:")
        statsBatch.select("source_file").distinct().show(50, truncate = false)
      }

      val newUnique = statsBatch.select("event_id").distinct().count()
      println(s"=== stats batchId=$batchId | newUnique=$newUnique ===")

      // 1) upsert stats into the master parquet
      upsertStatsIntoMaster(statsBatch)
      println(s"[DATA] upserted stats into $MASTER_PATH")

      // 2) update cumulative counter for periodic refit
      statsAccum += newUnique
      println(s"[TRIGGER] accum=$statsAccum (refit when >= 4)")

      val LIVE_MC_SIMS = 2000
      val threshold = 4L

      // 3) refit periodically
      if (statsAccum >= threshold) {
        statsAccum -= threshold
        println(s"[ML] TRIGGER refit | carry remainder=$statsAccum")

        ensureMasterParquet()
        val masterNow = spark.read.parquet(MASTER_PATH)

        val modelDFnew = masterNow
          .filter(col("home_expected_goals").isNotNull && col("away_expected_goals").isNotNull)
          .dropDuplicates("event_id")

        val trainRows = modelDFnew.count()
        println(s"[ML] training rows = $trainRows")

        if (trainRows == 0) {
          println("[ML] WARNING: 0 training rows (no rows in master with both xG labels).")
          println("[ML] This usually means: (1) event_id mismatch (check matched_in_master), or (2) master parquet was overwritten with null xG.")
          println("[ML] Skipping refit and keeping existing models.")
        } else {
          val (model_home, model_away) = refitHomeAwayPipelines(modelDFnew)
          model_home.write.overwrite().save(MODEL_HOME_PATH)
          model_away.write.overwrite().save(MODEL_AWAY_PATH)
          println("[ML] refit complete + models saved")
        }
      } else {
        println("[ML] skip refit")
      }

      // 4) Always re-score + export so the dashboard reacts to new stats
      refreshDashboard(seasonLabel = "25/26", mcSims = LIVE_MC_SIMS)

      // 5) Print key views for demo
      if (spark.catalog.tableExists("dashboard_live")) {
        println("[DEMO] dashboard_live (top 10)")
        spark.table("dashboard_live").show(10, truncate = false)
      }
      if (spark.catalog.tableExists("spi_live")) {
        println("[DEMO] spi_live (top 10)")
        spark.table("spi_live").orderBy(desc("spi")).show(10, truncate = false)
      }
    }
  }
  // NO CHECKPOINTING (demo mode)
  .start()


statsAccum = 0
mlQ = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7cba14f6


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7cba14f6

[STATS] batchId=0 | inputRows=7
[STATS] files:
+-------------------------------------------------------------------------------------+
|source_file                                                                          |
+-------------------------------------------------------------------------------------+
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195512_1766202822386.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195507_1766202822381.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195509_1766202822383.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195508_1766202822382.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195510_1766202822384.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195506_1766202822380.json|
|file:///workspace/PowerRanking/data/stream_in/stats/stats_14195511_1766202822385.json|
+------------------------------------------------------------------------