### Change Data Feed demo
- reference link: https://docs.gcp.databricks.com/en/_extras/notebooks/source/delta/cdf-demo.html

In [None]:
countries = [("USA", 10000, 20000), ("India", 1000, 1500), ("UK", 7000, 10000), ("Canada", 500, 700) ]
columns = ["Country","NumVaccinated","AvailableDoses"]
spark.createDataFrame(data=countries, schema = columns).write.format("delta").mode("overwrite").saveAsTable("silverTable")

In [None]:
import pyspark.sql.functions as F
#create gold delta table
spark.read.format("delta").table("silverTable").withColumn("VaccinationRate", F.col("NumVaccinated") / F.col("AvailableDoses")) \
  .drop("NumVaccinated").drop("AvailableDoses") \
  .write.format("delta").mode("overwrite").saveAsTable("goldTable")

In [None]:
#view data on gold table
%sql
SELECT * FROM goldTable

In [None]:
#update silverTable config to enable Change Data Feed
%sql
ALTER TABLE silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [None]:
# Insert new record to silverTable
new_countries = [("Australia", 100, 3000)]
spark.createDataFrame(data=new_countries, schema = columns).write.format("delta").mode("append").saveAsTable("silverTable")

In [None]:
# delete a record on silver table
%sql
-- delete a record
DELETE from silverTable WHERE Country = 'UK'

In [None]:
#totally, we have insert one and delete one
%sql
SELECT * FROM silverTable

In [None]:
#view the change using table_changes sql function
%sql 
-- view the changes
SELECT * FROM table_changes('silverTable', 2, 5) order by _commit_timestamp


In [None]:
#read change using pyspark api
changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).table('silverTable')
display(changes_df)

In [None]:
%sql
-- Collect only the latest version for each country
CREATE OR REPLACE TEMPORARY VIEW silverTable_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by Country order by _commit_version desc) as rank
          FROM table_changes('silverTable', 2, 5)
          WHERE _change_type !='update_preimage')
    WHERE rank=1

In [None]:

%sql
-- Merge the changes to gold
MERGE INTO goldTable t USING silverTable_latest_version s ON s.Country = t.Country
        WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET VaccinationRate = s.NumVaccinated/s.AvailableDoses
        WHEN NOT MATCHED THEN INSERT (Country, VaccinationRate) VALUES (s.Country, s.NumVaccinated/s.AvailableDoses)

In [None]:
%sql
SELECT * FROM goldTable

In [None]:
### Summary
  # enable change data feed on delta live table allow to to keep change history of record on row level
  # in this example, the key is country and we want to compute VaccinationRate for each country
  # because silverTable have been enable Change Data Feed which allow use to find the most current (_change_type, _commit_version, _commit_timestamp)