In [0]:
from pyspark.sql.functions import col, lag, sum as _sum, when, coalesce, first, max, concat, lit
from pyspark.sql.window import Window
import dlt

player_games = spark.read.table("tabular.dataexpert.jw_raw_player_games")
games = spark.read.table("tabular.dataexpert.jw_raw_games")

# Filter to columns of interest for game
columns_to_keep = ["GameID", "Status", "TimeRemainingMinutes", "AwayTeamID", "AwayTeam", "HomeTeam", "HomeTeamID", "AwayTeamScore", "HomeTeamScore", "timestamp"]
games = games.select([col(column) for column in columns_to_keep])

# Filter to columns of interest for player_game
columns_to_keep = ["GameID", "TeamID", "PlayerID", "Team", "Name", "Minutes", "timestamp"]
player_games = player_games.select([col(column) for column in columns_to_keep])

# Calculate the change in scores for each team between updates
games_window_spec = Window.partitionBy("GameID").orderBy("timestamp")
games = games.withColumn("PrevHomeTeamScore", lag("HomeTeamScore").over(games_window_spec))
games = games.withColumn("PrevAwayTeamScore", lag("AwayTeamScore").over(games_window_spec))
games = games.withColumn("HomeTeamScoreChange", col("HomeTeamScore") - col("PrevHomeTeamScore"))
games = games.withColumn("AwayTeamScoreChange", col("AwayTeamScore") - col("PrevAwayTeamScore"))

# Calculate the change in minutes for each player between updates
player_games_window_spec = Window.partitionBy("GameID", "PlayerID").orderBy("timestamp")
player_games = player_games.withColumn("PrevMinutes", lag("Minutes").over(player_games_window_spec))
player_games = player_games.withColumn("MinutesChange", col("Minutes") - col("PrevMinutes"))

# Eliminate rows with negative minute changes (probably a stat correction...)
player_games = player_games.filter(player_games["MinutesChange"] >= 0)

# Join the tables to calculate the plus-minus metric
join_condition = [
    games["GameID"] == player_games["GameID"],
    games["timestamp"] == player_games["timestamp"]
    ]
plus_minus = games.join(player_games, join_condition, "inner")

# Determine row plus_minus based on player's TeamID
plus_minus = plus_minus.withColumn(
    "PlusMinusChange",
    when(
        col("TeamID") == col("HomeTeamID"),
        col("HomeTeamScoreChange") - col("AwayTeamScoreChange")
    ).otherwise(
        col("AwayTeamScoreChange") - col("HomeTeamScoreChange")
    )
)

# Filter to credit/debit a player only if they were in the game between updates
plus_minus = plus_minus.filter(plus_minus["MinutesChange"] > 0)

# Group by PlayerID and GameID, and calculate the sum of PlusMinusChange
grouped_df = plus_minus.groupBy("PlayerID", games.GameID).agg(
    _sum("PlusMinusChange").alias("TotalPlusMinusChange"), 
    first("TeamID").alias("TeamID"), 
    first("Team").alias("Team"), 
    first("Name").alias("Player_Name"), 
    first("HomeTeam").alias("HomeTeam"),
    first("AwayTeam").alias("AwayTeam"),
    max("Minutes").alias("Total_Minutes")
)

# Add the HomeTeam_AwayTeam column
grouped_df = grouped_df.withColumn(
    "Matchup",
    concat(col("HomeTeam"), lit("_vs._"), col("AwayTeam"))
)

In [0]:
# Define the Delta Live Table
@dlt.table(
    name="jw_plus_minus"
)

@dlt.expect("non_null_PlayerID", "PlayerID IS NOT NULL")
@dlt.expect("non_null_GameID", "GameID IS NOT NULL")
@dlt.expect("valid_Minutes", "Total_Minutes >= 0")
@dlt.expect("valid_PlusMinusChange", "TotalPlusMinusChange IS NOT NULL")

def update_table():
    return grouped_df