In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
# ── CONFIG ─────────────────────────────────────────────────────────────────────
spark = SparkSession.builder.getOrCreate()

# Source (East) and target (West) Lakehouse connection strings
east_path = "abfss://<container>@<east-account>.dfs.core.windows.net/lakehouse/mydb.db"
west_path = "abfss://<container>@<west-account>.dfs.core.windows.net/lakehouse/mydb.db"

In [None]:
# Last sync timestamp (persist this somewhere: KeyVault, file, table)
last_sync = "2025-07-20T12:00:00Z"

In [None]:
# ── LOAD DELTA FROM EAST ────────────────────────────────────────────────────────
east_df = (
    spark.read.format("delta")
    .load(f"{east_path}/my_table")
    .filter(col("modified_date") > last_sync)
)


In [None]:
# ── UPSERT INTO WEST ───────────────────────────────────────────────────────────
from delta.tables import DeltaTable

# ensure target table exists
if not DeltaTable.isDeltaTable(spark, f"{west_path}/my_table"):
    east_df.write.format("delta").mode("overwrite").save(f"{west_path}/my_table")
else:
    dt = DeltaTable.forPath(spark, f"{west_path}/my_table")
    dt.alias("tgt") \
      .merge(east_df.alias("src"), "tgt.id = src.id") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()




In [None]:
# ── UPDATE last_sync (to now) ──────────────────────────────────────────────────
# e.g. write new timestamp to blob or KeyVault