In [1]:
# ==========================================================
# SCD Type 2 for dim_players
# ==========================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

def run_dim_players_scd2():
    """
    Runs a Slowly Changing Dimension (Type 2) load for the dim_players table.

    Behaviour:
    - If the gold table does not exist, it creates it and loads all current rows.
    - On subsequent runs, it:
        1) Detects new players and changed attribute values.
        2) Expires current rows for changed players (sets is_current = False, end_date = yesterday).
        3) Inserts new current rows (with updated attributes) and brand-new players.
    """

    # ----------------------------
    # Configuration
    # ----------------------------
    silver_path = "Tables/player_info_silver"
    gold_path   = (
        "abfss://dc478dd4-e53e-4f21-add0-2e376dc173fe@onelake.dfs.fabric.microsoft.com/"
        "2dd626a9-1762-4265-b676-2ccd6b809184/Tables/dim_players"
    )

    today_col = F.current_date()

    business_key_col = "player_id"
    tracked_cols = [
        "first_name", "last_name", "nationality", "birth_date",
        "primary_position", "height_cm", "weight", "shoots_catches"
    ]

    # ----------------------------
    # Read source (Silver) and normalise types
    # ----------------------------
    source_df = (
        spark.read.format("delta").load(silver_path)
        .select(
            F.col("player_id").cast("string").alias("player_id"),
            F.col("first_name").cast("string").alias("first_name"),
            F.col("last_name").cast("string").alias("last_name"),
            F.col("nationality").cast("string").alias("nationality"),
            F.col("birth_date").cast("string").alias("birth_date"),
            F.col("primary_position").cast("string").alias("primary_position"),
            F.col("height_cm").cast("string").alias("height_cm"),
            F.col("weight").cast("string").alias("weight"),
            F.col("shoots_catches").cast("string").alias("shoots_catches")
        )
        .dropDuplicates()
    )

    # Build hash across tracked attributes (drives SCD2 change detection)
    source_df = source_df.withColumn(
        "hash_diff",
        F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c), F.lit("")) for c in tracked_cols]), 256)
    )

    # ----------------------------
    # Helper: assign surrogate keys to rows being inserted
    # ----------------------------
    def add_surrogate_keys_for_inserts(insert_df, current_max_key):
        """
        Assigns a new, sequential surrogate key (player_key) to each row in insert_df.
        The sequence starts from current_max_key + 1, ordered by business key.
        """
        window = Window.orderBy(F.col(business_key_col).asc())
        return (
            insert_df
            .withColumn("row_no_for_key", F.row_number().over(window))
            .withColumn("player_key", F.col("row_no_for_key") + F.lit(int(current_max_key)))
            .drop("row_no_for_key")
        )

    # ----------------------------
    # FIRST RUN → create the table
    # ----------------------------
    if not DeltaTable.isDeltaTable(spark, gold_path):
        initial_df = (
            source_df
            .withColumn("is_current", F.lit(True).cast("boolean"))
            .withColumn("start_date", today_col)
            .withColumn("end_date", F.lit(None).cast("date"))
        )

        initial_df = add_surrogate_keys_for_inserts(initial_df, current_max_key=0)

        initial_df = initial_df.select(
            "player_key",
            "player_id",
            "first_name",
            "last_name",
            "nationality",
            "birth_date",
            "primary_position",
            "height_cm",
            "weight",
            "shoots_catches",
            "hash_diff",
            "is_current",
            "start_date",
            "end_date"
        )

        initial_df.write.format("delta").mode("overwrite").save(gold_path)
        print(f"✅ Initial SCD2 load complete. Rows inserted: {initial_df.count()}")
        return  # finished first-run path

    # ----------------------------
    # SUBSEQUENT RUNS → two MERGE pattern
    # ----------------------------
    gold_delta = DeltaTable.forPath(spark, gold_path)
    gold_df    = gold_delta.toDF()

    gold_current_df = (
        gold_df
        .filter(F.col("is_current") == True)
        .select("player_id", "hash_diff")
    )

    joined = (
        source_df.alias("s")
        .join(gold_current_df.alias("t"), on="player_id", how="left")
    )

    new_keys_df = joined.filter(F.col("t.hash_diff").isNull()).select("s.*")
    changed_keys_df = joined.filter(
        (F.col("t.hash_diff").isNotNull()) & (F.col("s.hash_diff") != F.col("t.hash_diff"))
    ).select("s.*")

    new_count = new_keys_df.count()
    changed_count = changed_keys_df.count()
    print(f"Detected new={new_count}, changed={changed_count}")

    if new_count == 0 and changed_count == 0:
        print("No SCD2 updates required.")
    else:
        # MERGE #1: expire current rows for changed keys
        keys_to_expire_df = (
            changed_keys_df
            .select("player_id")
            .dropDuplicates()
            .withColumn("merge_marker", F.lit(1))
        )

        (
            gold_delta.alias("t")
            .merge(
                keys_to_expire_df.alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenMatchedUpdate(set={
                "is_current": F.lit(False),
                "end_date":  F.date_sub(today_col, 1)
            })
            .execute()
        )
        print(f"Expired {keys_to_expire_df.count()} current rows (for changed player_ids).")

        # Prepare inserts (brand-new + changed versions)
        rows_to_insert_df = (
            new_keys_df.unionByName(changed_keys_df)
            .withColumn("is_current", F.lit(True).cast("boolean"))
            .withColumn("start_date", today_col)
            .withColumn("end_date", F.lit(None).cast("date"))
        )

        current_max_key = gold_df.agg(F.max("player_key").alias("max_key")).collect()[0]["max_key"]
        if current_max_key is None:
            current_max_key = 0

        rows_to_insert_df = add_surrogate_keys_for_inserts(rows_to_insert_df, current_max_key)

        # MERGE #2: insert new current rows
        (
            gold_delta.alias("t")
            .merge(
                rows_to_insert_df.alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenNotMatchedInsert(values={
                "player_key":       "s.player_key",
                "player_id":        "s.player_id",
                "first_name":       "s.first_name",
                "last_name":        "s.last_name",
                "nationality":      "s.nationality",
                "birth_date":       "s.birth_date",
                "primary_position": "s.primary_position",
                "height_cm":        "s.height_cm",
                "weight":           "s.weight",
                "shoots_catches":   "s.shoots_catches",
                "hash_diff":        "s.hash_diff",
                "is_current":       "s.is_current",
                "start_date":       "s.start_date",
                "end_date":         "s.end_date"
            })
            .execute()
        )

        inserted_count = rows_to_insert_df.count()
        print(f"✅ Inserted {inserted_count} new current rows (brand-new + changed versions).")

    # ----------------------------
    # Verification (optional)
    # ----------------------------
    final_df = spark.read.format("delta").load(gold_path)
    total_rows   = final_df.count()
    current_rows = final_df.filter(F.col("is_current") == True).count()
    expired_rows = final_df.filter(F.col("is_current") == False).count()
    print(f"After SCD2 run → total: {total_rows}, current: {current_rows}, expired: {expired_rows}")
    display(final_df.orderBy(F.col("player_id").asc(), F.col("start_date").desc()).limit(20))

# ---- Execute the function ----
run_dim_players_scd2()

StatementMeta(, e1f0e175-df21-4e60-ba83-c0998cb51620, 3, Finished, Available, Finished)

In [1]:
"""
# ==========================================================
# SCD Type 2 for dim_players using two Delta MERGE operations
# (No temp views; uses DataFrame directly in MERGE)
# ==========================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# ----------------------------
# Configuration
# ----------------------------
silver_path = "Tables/player_info_silver"
gold_path   = "abfss://dc478dd4-e53e-4f21-add0-2e376dc173fe@onelake.dfs.fabric.microsoft.com/2dd626a9-1762-4265-b676-2ccd6b809184/Tables/dim_players"

# Today (used for SCD2 dating)
today_col = F.current_date()

# Columns that make up the natural/business key and tracked attributes
business_key_col = "player_id"
tracked_cols = [
    "first_name", "last_name", "nationality", "birth_date",
    "primary_position", "height_cm", "weight", "shoots_catches"
]

# ----------------------------
# Read source (Silver) and normalise types
# ----------------------------
source_df = (
    spark.read.format("delta").load(silver_path)
    .select(
        F.col("player_id").cast("string").alias("player_id"),
        F.col("first_name").cast("string").alias("first_name"),
        F.col("last_name").cast("string").alias("last_name"),
        F.col("nationality").cast("string").alias("nationality"),
        F.col("birth_date").cast("string").alias("birth_date"),
        F.col("primary_position").cast("string").alias("primary_position"),
        F.col("height_cm").cast("string").alias("height_cm"),
        F.col("weight").cast("string").alias("weight"),
        F.col("shoots_catches").cast("string").alias("shoots_catches")
    )
    .dropDuplicates()
)

# Build a stable hash fingerprint across all tracked attributes.
# Any change to one of the tracked columns will change this hash.
source_df = source_df.withColumn(
    "hash_diff",
    F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c), F.lit("")) for c in tracked_cols]), 256)
)

# ----------------------------
# Helper: assign surrogate keys for rows we are going to INSERT
# ----------------------------
def add_surrogate_keys_for_inserts(insert_df, current_max_key):
    window = Window.orderBy(F.col(business_key_col).asc())
    return (
        insert_df
        .withColumn("row_no_for_key", F.row_number().over(window))
        .withColumn("player_key", F.col("row_no_for_key") + F.lit(int(current_max_key)))
        .drop("row_no_for_key")
    )

# ----------------------------
# FIRST RUN (table does not exist) → create full SCD2 table
# ----------------------------
if not DeltaTable.isDeltaTable(spark, gold_path):

    initial_df = (
        source_df
        .withColumn("is_current", F.lit(True).cast("boolean"))
        .withColumn("start_date", today_col)
        .withColumn("end_date", F.lit(None).cast("date"))
    )

    initial_df = add_surrogate_keys_for_inserts(initial_df, current_max_key=0)

    initial_df = initial_df.select(
        "player_key",
        "player_id",
        "first_name",
        "last_name",
        "nationality",
        "birth_date",
        "primary_position",
        "height_cm",
        "weight",
        "shoots_catches",
        "hash_diff",
        "is_current",
        "start_date",
        "end_date"
    )

    (initial_df.write.format("delta").mode("overwrite").save(gold_path))
    print(f"✅ Initial SCD2 load complete. Rows inserted: {initial_df.count()}")

# ----------------------------
# SUBSEQUENT RUNS → two MERGE pattern
# ----------------------------
else:
    gold_delta = DeltaTable.forPath(spark, gold_path)
    gold_df    = gold_delta.toDF()

    # Consider only the current versions in Gold for comparison
    gold_current_df = (
        gold_df
        .filter(F.col("is_current") == True)
        .select("player_id", "hash_diff")
    )

    # Determine brand-new and changed keys
    joined = (
        source_df.alias("s")
        .join(gold_current_df.alias("t"), on="player_id", how="left")
    )

    new_keys_df = joined.filter(F.col("t.hash_diff").isNull()).select("s.*")
    changed_keys_df = joined.filter(
        (F.col("t.hash_diff").isNotNull()) & (F.col("s.hash_diff") != F.col("t.hash_diff"))
    ).select("s.*")

    new_count = new_keys_df.count()
    changed_count = changed_keys_df.count()
    print(f"Detected new={new_count}, changed={changed_count}")

    # If nothing changed, we are done
    if new_count == 0 and changed_count == 0:
        print("No SCD2 updates required.")
    else:
        # --------------------------------------------
        # MERGE #1: EXPIRE current rows for CHANGED keys
        # --------------------------------------------
        keys_to_expire_df = (
            changed_keys_df
            .select("player_id")
            .dropDuplicates()
            .withColumn("merge_marker", F.lit(1))
        )

        (
            gold_delta.alias("t")
            .merge(
                keys_to_expire_df.alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenMatchedUpdate(set={
                "is_current": F.lit(False),
                # If you prefer same-day boundary, use today_col instead of date_sub
                "end_date":  F.date_sub(today_col, 1)
            })
            .execute()
        )
        print(f"Expired {keys_to_expire_df.count()} current rows (for changed player_ids).")

        # --------------------------------------------
        # Prepare rows to INSERT (brand-new + changed-versions)
        # --------------------------------------------
        rows_to_insert_df = (
            new_keys_df.unionByName(changed_keys_df)
            .withColumn("is_current", F.lit(True).cast("boolean"))
            .withColumn("start_date", today_col)
            .withColumn("end_date", F.lit(None).cast("date"))
        )

        # Assign surrogate keys only to rows being inserted
        current_max_key = gold_df.agg(F.max("player_key").alias("max_key")).collect()[0]["max_key"]
        if current_max_key is None:
            current_max_key = 0

        rows_to_insert_df = add_surrogate_keys_for_inserts(rows_to_insert_df, current_max_key)

        # --------------------------------------------
        # MERGE #2: INSERT new current rows (DataFrame source; no temp view)
        # --------------------------------------------
        (
            gold_delta.alias("t")
            .merge(
                rows_to_insert_df.alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenNotMatchedInsert(values={
                "player_key":       "s.player_key",
                "player_id":        "s.player_id",
                "first_name":       "s.first_name",
                "last_name":        "s.last_name",
                "nationality":      "s.nationality",
                "birth_date":       "s.birth_date",
                "primary_position": "s.primary_position",
                "height_cm":        "s.height_cm",
                "weight":           "s.weight",
                "shoots_catches":   "s.shoots_catches",
                "hash_diff":        "s.hash_diff",
                "is_current":       "s.is_current",
                "start_date":       "s.start_date",
                "end_date":         "s.end_date"
            })
            .execute()
        )

        inserted_count = rows_to_insert_df.count()
        print(f"✅ Inserted {inserted_count} new current rows (brand-new + changed versions).")

# ----------------------------
# Verify
# ----------------------------
final_df = spark.read.format("delta").load(gold_path)
total_rows   = final_df.count()
current_rows = final_df.filter(F.col("is_current") == True).count()
expired_rows = final_df.filter(F.col("is_current") == False).count()

print(f"After SCD2 run → total: {total_rows}, current: {current_rows}, expired: {expired_rows}")
display(
    final_df.orderBy(F.col("player_id").asc(), F.col("start_date").desc()).limit(20)
)
"""


StatementMeta(, 41d98384-60d9-4cc6-82eb-88bc4a4b4947, 3, Finished, Available, Finished)

Detected new=0, changed=0
No SCD2 updates required.
After SCD2 run → total: 3925, current: 3925, expired: 0


SynapseWidget(Synapse.DataFrame, 93478e14-08e7-4046-9405-864bafe41289)

In [1]:
"""
# ==========================================================
# SCD Type 2 for dim_players using two Delta MERGE operations
# ==========================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# ----------------------------
# Configuration
# ----------------------------
silver_path = "Tables/player_info_silver"
gold_path   = "abfss://dc478dd4-e53e-4f21-add0-2e376dc173fe@onelake.dfs.fabric.microsoft.com/2dd626a9-1762-4265-b676-2ccd6b809184/Tables/dim_players"

today_col = F.current_date()

# Columns that make up the natural/business key and tracked attributes
business_key_col = "player_id"
tracked_cols = [
    "first_name", "last_name", "nationality", "birth_date",
    "primary_position", "height_cm", "weight", "shoots_catches"
]

# ----------------------------
# Read source (Silver) and normalise
# ----------------------------
source_df = (
    spark.read.format("delta").load(silver_path)
    .select(
        F.col("player_id").cast("string").alias("player_id"),
        F.col("first_name").cast("string").alias("first_name"),
        F.col("last_name").cast("string").alias("last_name"),
        F.col("nationality").cast("string").alias("nationality"),
        F.col("birth_date").cast("string").alias("birth_date"),
        F.col("primary_position").cast("string").alias("primary_position"),
        F.col("height_cm").cast("string").alias("height_cm"),
        F.col("weight").cast("string").alias("weight"),
        F.col("shoots_catches").cast("string").alias("shoots_catches")
    )
    .dropDuplicates()
)

# Build a stable hash fingerprint across all tracked attributes
source_df = source_df.withColumn(
    "hash_diff",
    F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c), F.lit("")) for c in tracked_cols]), 256)
)

# ----------------------------
# Helper: assign surrogate keys for rows we are going to INSERT
# ----------------------------
def add_surrogate_keys_for_inserts(insert_df, current_max_key):
    window = Window.orderBy(F.col(business_key_col).asc())
    return (
        insert_df
        .withColumn("row_no_for_key", F.row_number().over(window))
        .withColumn("player_key", F.col("row_no_for_key") + F.lit(int(current_max_key)))
        .drop("row_no_for_key")
    )

# ----------------------------
# FIRST RUN (table does not exist) → create full SCD2 table
# ----------------------------
if not DeltaTable.isDeltaTable(spark, gold_path):

    initial_df = (
        source_df
        .withColumn("is_current", F.lit(True).cast("boolean"))
        .withColumn("start_date", today_col)
        .withColumn("end_date", F.lit(None).cast("date"))
    )

    initial_df = add_surrogate_keys_for_inserts(initial_df, current_max_key=0)

    initial_df = initial_df.select(
        "player_key",
        "player_id",
        "first_name",
        "last_name",
        "nationality",
        "birth_date",
        "primary_position",
        "height_cm",
        "weight",
        "shoots_catches",
        "hash_diff",
        "is_current",
        "start_date",
        "end_date"
    )

    (initial_df.write.format("delta").mode("overwrite").save(gold_path))
    print(f"✅ Initial SCD2 load complete. Rows inserted: {initial_df.count()}")

# ----------------------------
# SUBSEQUENT RUNS → two MERGE pattern
# ----------------------------
else:
    gold_delta = DeltaTable.forPath(spark, gold_path)
    gold_df    = gold_delta.toDF()

    # Consider only the current versions in Gold for comparison
    gold_current_df = gold_df.filter(F.col("is_current") == True) \
                             .select("player_id", "hash_diff")

    # Determine brand-new and changed keys
    joined = (
        source_df.alias("s")
        .join(gold_current_df.alias("t"), on="player_id", how="left")
    )

    new_keys_df = joined.filter(F.col("t.hash_diff").isNull()).select("s.*")
    changed_keys_df = joined.filter(
        (F.col("t.hash_diff").isNotNull()) & (F.col("s.hash_diff") != F.col("t.hash_diff"))
    ).select("s.*")

    new_count = new_keys_df.count()
    changed_count = changed_keys_df.count()
    print(f"Detected new={new_count}, changed={changed_count}")

    # If nothing changed, we are done
    if new_count == 0 and changed_count == 0:
        print("No SCD2 updates required.")
    else:
        # --------------------------------------------
        # MERGE #1: EXPIRE current rows for CHANGED keys
        # --------------------------------------------
        keys_to_expire_df = (
            changed_keys_df.select("player_id").dropDuplicates()
                           .withColumn("merge_marker", F.lit(1))
        )

        (
            gold_delta.alias("t")
            .merge(
                keys_to_expire_df.alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenMatchedUpdate(set={
                "is_current": F.lit(False),
                # If you prefer same-day boundary, use today_col instead of date_sub
                "end_date":  F.date_sub(today_col, 1)
            })
            .execute()
        )
        print(f"Expired {keys_to_expire_df.count()} current rows (for changed player_ids).")

        # --------------------------------------------
        # Prepare rows to INSERT (brand-new + changed-versions)
        # --------------------------------------------
        rows_to_insert_df = (
            new_keys_df.unionByName(changed_keys_df)
            .withColumn("is_current", F.lit(True).cast("boolean"))
            .withColumn("start_date", today_col)
            .withColumn("end_date", F.lit(None).cast("date"))
        )

        # Assign surrogate keys only to rows being inserted
        current_max_key = gold_df.agg(F.max("player_key").alias("max_key")).collect()[0]["max_key"]
        if current_max_key is None:
            current_max_key = 0

        rows_to_insert_df = add_surrogate_keys_for_inserts(rows_to_insert_df, current_max_key)

        # Create a temp view for the MERGE source (readable in both SQL and Python APIs)
        temp_view_name = "rows_to_insert_dim_players_tmp"
        rows_to_insert_df.createOrReplaceTempView(temp_view_name)

        # --------------------------------------------
        # MERGE #2: INSERT new current rows
        # Note: We match only against current rows in target. Because we have just
        # expired changed rows, and brand-new keys do not exist at all, every row
        # in the temp view will be "NOT MATCHED" and therefore inserted.
        # --------------------------------------------
        (
            gold_delta.alias("t")
            .merge(
                spark.table(temp_view_name).alias("s"),
                "t.player_id = s.player_id AND t.is_current = true"
            )
            .whenNotMatchedInsert(values={
                "player_key":       "s.player_key",
                "player_id":        "s.player_id",
                "first_name":       "s.first_name",
                "last_name":        "s.last_name",
                "nationality":      "s.nationality",
                "birth_date":       "s.birth_date",
                "primary_position": "s.primary_position",
                "height_cm":        "s.height_cm",
                "weight":           "s.weight",
                "shoots_catches":   "s.shoots_catches",
                "hash_diff":        "s.hash_diff",
                "is_current":       "s.is_current",
                "start_date":       "s.start_date",
                "end_date":         "s.end_date"
            })
            .execute()
        )

        inserted_count = spark.table(temp_view_name).count()
        print(f"✅ Inserted {inserted_count} new current rows (brand-new + changed versions).")

# ----------------------------
# Verify
# ----------------------------
final_df = spark.read.format("delta").load(gold_path)
total_rows   = final_df.count()
current_rows = final_df.filter(F.col("is_current") == True).count()
expired_rows = final_df.filter(F.col("is_current") == False).count()

print(f"After SCD2 run → total: {total_rows}, current: {current_rows}, expired: {expired_rows}")
display(
    final_df.orderBy(F.col("player_id").asc(), F.col("start_date").desc()).limit(20)
)
"""


StatementMeta(, f3062a4d-8e45-4612-9aed-7b3a5c3e331a, 3, Finished, Available, Finished)

Detected new=0, changed=0
No SCD2 updates required.
After SCD2 run → total: 3925, current: 3925, expired: 0


SynapseWidget(Synapse.DataFrame, 3b4c6256-9542-4a58-bd85-0aeb9189ce0a)

In [2]:
"""
# ============================
# Silver -> Gold Load to Lake with Surrogate Key (player_key)
# ============================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# ---------------------------------------
# PART 1: Read Silver and Select Only Needed Columns
# ---------------------------------------

# Read the Silver table into a Spark DataFrame
df = spark.read.format("delta").load("Tables/player_info_silver")

# Select only the 4 business columns we want (exclude surrogate for now)
source_df = (
    df
    .select(
        F.col("player_id").cast("string"),
        F.col("first_name").cast("string"),
        F.col("last_name").cast("string"),
        F.col("nationality").cast("string"),
        F.col("birth_date").cast("string"),
        F.col("primary_position").cast("string"),
        F.col("height_cm").cast("string"),
        F.col("weight").cast("string"),
        F.col("shoots_catches").cast("string")
    )
    .dropDuplicates()  # remove exact duplicates
)

# ---------------------------------------
# PART 2: Define Gold Target Path
# ---------------------------------------

target_path = "abfss://dc478dd4-e53e-4f21-add0-2e376dc173fe@onelake.dfs.fabric.microsoft.com/2dd626a9-1762-4265-b676-2ccd6b809184/Tables/dim_players"

# Helper: add surrogate key column player_key
def attach_surrogate_keys(input_df, start_offset):
    window_no_partition = Window.orderBy(F.col("player_id").asc())
    return input_df.withColumn("player_key", F.row_number().over(window_no_partition) + F.lit(start_offset))

# ---------------------------------------
# PART 3: Write to Gold
# ---------------------------------------

if DeltaTable.isDeltaTable(spark, target_path):
    # Table exists: load it
    existing_delta = DeltaTable.forPath(spark, target_path)
    existing_df = existing_delta.toDF()

    # Get existing keys and current max surrogate key
    existing_keys_df = existing_df.select("player_id").distinct()
    current_max_sk = existing_df.agg(F.max("player_key").alias("max_key")).collect()[0]["max_key"]
    if current_max_sk is None:
        current_max_sk = 0

    # Identify only new rows
    new_rows_df = source_df.join(existing_keys_df, on="player_id", how="left_anti")

    if new_rows_df.rdd.isEmpty():
        print("No new rows to append. dim_players is already up to date.")
    else:
        # Add surrogate keys starting from current max
        new_rows_with_sk_df = attach_surrogate_keys(new_rows_df, current_max_sk)

        # Reorder columns: player_key first
        new_rows_with_sk_df = new_rows_with_sk_df.select(
            "player_key", "player_id", "first_name", "last_name", "nationality", "birth_date", "primary_position", "height_cm", "weight", "shoots_catches"
        )

        (new_rows_with_sk_df
            .write
            .format("delta")
            .mode("append")
            .save(target_path))

        print(f"✅ Incremental load complete: {new_rows_with_sk_df.count()} new rows appended to dim_players in Lakehouse_Gold.")

else:
    # Table does not exist: create it
    initial_rows_with_sk_df = attach_surrogate_keys(source_df, start_offset=0)

    # Reorder columns: player_key first
    initial_rows_with_sk_df = initial_rows_with_sk_df.select(
        "player_key", "player_id", "first_name", "last_name", "nationality", "birth_date", "primary_position", "height_cm", "weight", "shoots_catches"
    )

    (initial_rows_with_sk_df
        .write
        .format("delta")
        .mode("overwrite")
        .save(target_path))

    print(f"✅ Initial load complete: dim_players created with {initial_rows_with_sk_df.count()} rows.")

# ---------------------------------------
# PART 4: Verify
# ---------------------------------------

result_df = spark.read.format("delta").load(target_path)
print(f"Rows in Gold after write: {result_df.count()}")
display(result_df.orderBy(F.col("player_key").asc()).limit(10))
"""


StatementMeta(, fd407dec-737d-466b-8a37-97725aafe7cb, 4, Finished, Available, Finished)

✅ Initial load complete: dim_players created with 3925 rows.
Rows in Gold after write: 3925


SynapseWidget(Synapse.DataFrame, afe88ba0-58df-4d72-b57d-896fa01b3618)