In [None]:
from datetime import datetime
from pathlib import Path
from pyspark.sql import (
    Row,
    SparkSession,
)
from pyspark.sql.functions import lit

CATALOG_NAME = "scd_catalog"
WAREHOUSE_PATH = Path.cwd() / "warehouse"

spark = SparkSession.builder \
    .appName("Iceberg SCD Type 2") \
    .config(f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{CATALOG_NAME}.type", "hadoop") \
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", str(WAREHOUSE_PATH)) \
    .config("spark.sql.defaultCatalog", CATALOG_NAME) \
    .getOrCreate()



In [None]:
ut = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
product = [
    {"id": "00001", "name": "Heater", "price": 250, "category": "Electronics", "updated_at": ut},
    {"id": "00002", "name": "Thermostat", "price": 400, "category": "Electronics", "updated_at": ut},
    {"id": "00003", "name": "Television", "price": 600, "category": "Electronics", "updated_at": ut},
]

df = spark.createDataFrame(Row(**x) for x in product)
df.createOrReplaceTempView('tmp')

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.db.products USING iceberg LOCATION '{WAREHOUSE_PATH}'
AS SELECT * FROM tmp
""")


In [None]:
updates = [
    {"id": "00001", "name": "Heater", "price": 500, "category": "Electronics", "updated_at": ut},  # update
    {"id": "00004", "name": "Chair", "price": 50, "category": "Furniture", "updated_at": ut},  # insert
]

df_updates = spark.createDataFrame(Row(**x) for x in updates)
df_updates.createOrReplaceTempView("products_upsert")
