In [0]:
dbutils.widgets.text("catalog", "", "Catalog")
dbutils.widgets.text("schema", "", "Schema")

In [0]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

print(catalog)
print(schema)

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import *

data = [
    Row(id=1, name="Alice", salary=70000),
    Row(id=2, name="Bob", salary=80000),
    Row(id=3, name="Charlie", salary=75000),
    Row(id=4, name="David", salary=72000),
    Row(id=5, name="Eve", salary=78000)
]

bronze_df = spark.createDataFrame(data).withColumn("snapshot", lit(1))
display(bronze_df)

In [0]:
bronze_table = "{}.{}.bronze_person".format(catalog, schema)
silver_table = "{}.{}.silver_person".format(catalog, schema)

# Use for hive metastore
# bronze_table = "{}.bronze_person".format(schema)
# silver_table = "{}.silver_person".format(schema)


In [0]:
bronze_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(bronze_table)

In [0]:

(
  spark.read.format("delta").table(bronze_table).drop("snapshot")
    .write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(silver_table)
)

In [0]:
data_2 = [
    Row(id=1, name="Alice", salary=75000, delete=False),
    Row(id=2, name="Bob X", salary=80000, delete=False),
    Row(id=3, name="John", salary=178000, delete=True),
    Row(id=6, name="John", salary=178000, delete=False)
]

bronze_df_2 = spark.createDataFrame(data_2).withColumn("snapshot", lit(2))
display(bronze_df_2)

In [0]:
bronze_df_2.write.format("delta").option("mergeSchema", "true").mode("append").saveAsTable(bronze_table)

In [0]:
bronze = spark.sql("SELECT * FROM usa.scott_stafford_merge.bronze_person")
display(bronze)

In [0]:
%sql
USE CATALOG usa;
USE SCHEMA scott_stafford_merge;



In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW person_snapshot AS 
  SELECT *
  FROM bronze_person
  WHERE snapshot = (SELECT max(snapshot) FROM bronze_person)

In [0]:
%sql

SELECT * FROM person_snapshot

In [0]:
%sql

MERGE INTO silver_person
USING person_snapshot
ON person_snapshot.id = silver_person.id
WHEN MATCHED AND delete = True THEN DELETE
WHEN MATCHED AND delete = False THEN
  UPDATE SET
    id = person_snapshot.id,
    name = person_snapshot.name,
    salary = person_snapshot.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    name,
    salary
  )
  VALUES (
    person_snapshot.id,
    person_snapshot.name,
    person_snapshot.salary
  )



In [0]:
%sql
SELECT *
FROM silver_person
SORT BY id ASC