In [0]:
%sql
-- CREATE Bronze table
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.bronze_users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
);

In [0]:
%sql
SELECT * FROM poc_catalog.poc_demo_impetus.bronze_users

In [0]:
%sql
INSERT INTO poc_catalog.poc_demo_impetus.bronze_users
SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  (127, "ABC",     "XYZ",      "INSERT", 7),
  (124, null,       null,     "DELETE", 7),
  (126, "Lily2", "Cancun2",     "UPDATE", 7)
);

In [0]:
%sql
DELETE FROM poc_catalog.poc_demo_impetus.silver_users WHERE userId = 125;
UPDATE poc_catalog.poc_demo_impetus.silver_users SET name = 'Lily3' WHERE userId = 126;
INSERT INTO poc_catalog.poc_demo_impetus.silver_users VALUES (125, "PQR",   "XYZ");

In [0]:
%sql
MERGE INTO poc_catalog.poc_demo_impetus.silver_users AS T
USING
  (SELECT
    col1 AS userId,
    col2 AS name,
    col3 AS city
  FROM (
    VALUES
    (128, "PQR1", "XYZ1"),
    (127, "ABC1", "XYZ1"),
    (129, "ZZZ", "ZZZ")
  )
) AS S
ON S.userId = T.userId
  WHEN MATCHED THEN UPDATE SET T.name = S.name, T.city = S.city
  WHEN NOT MATCHED THEN INSERT *
  WHEN NOT MATCHED BY SOURCE THEN DELETE

In [0]:
%sql
INSERT INTO poc_catalog.poc_demo_impetus.bronze_users values (null, null, null, "TRUNCATE", 8)
DELETE FROM poc_catalog.poc_demo_impetus.bronze_users;

--Does not work on Streaming tables:
--TRUNCATE TABLE silver_target

In [0]:
%sql
SELECT * FROM poc_catalog.poc_demo_impetus.silver_users;


In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)

In [0]:
%sql
SET spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf = True;
SELECT * FROM table_changes('poc_catalog.poc_demo_impetus.silver_users', 1 );


In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .option("startingVersion", 1)
            .table("poc_catalog.poc_demo_impetus.silver_users")
            .select(
                col("userId"), col("name"), col("city"),
                when(col("_change_type") == lit("update_postimage"), lit(2) * col("_commit_version") + lit(1)).otherwise(lit(2) * col("_commit_version")).alias("_commit_version"),
                when(col("_change_type") == lit("update_preimage"), lit("DELETE")).when(col("_change_type") == lit("update_postimage"), lit("INSERT")).otherwise(col("_change_type")).alias("operation"),
                col("_commit_timestamp")
            )
          )
display(df)    

min_v = df.agg(min("_commit_version")).head().get(0)    
min_ts = df.agg(min("_commit_timestamp")).head().get(0) 
print(min_v)
print(min_ts)

In [0]:
%sql
SELECT * FROM poc_catalog.poc_demo_impetus.gold_users2;


In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .table("poc_catalog.poc_demo_impetus.gold_users_append")
            .select(
                col("userId"), col("name"), col("city"),
                when(col("_change_type") == lit("update_postimage"), lit(2) * col("_commit_version") + lit(1)).otherwise(lit(2) * col("_commit_version")).alias("_commit_version"),
                when(col("_change_type") == lit("update_preimage"), lit("DELETE")).when(col("_change_type") == lit("update_postimage"), lit("INSERT")).otherwise(col("_change_type")).alias("operation"),
                col("_commit_timestamp")
            )
          )
display(df)   




In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .table("poc_catalog.poc_demo_impetus.gold_users_append")
            .withColumn("_commit_version",expr("CASE WHEN _change_type = 'update_postimage' THEN 2 * _commit_version +1 ELSE 2 * _commit_version END"))
            .withColumn("_change_type",expr("CASE WHEN _change_type = 'update_preimage' THEN 'DELETE' WHEN _change_type = 'update_postimage' THEN 'INSERT' ELSE _change_type END"))   
            .withColumnRenamed("_change_type","OPERATION")                  
          )
display(df)   

In [0]:
%sql
delete from poc_catalog.poc_demo_impetus.gold_users2;

In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .option("startingVersion", 1)
            .table("poc_catalog.poc_demo_impetus.input_data_table2")
            .select(
                col("id"), col("name"), col("city"),
                when(col("_change_type") == lit("update_postimage"), lit(2) * col("_commit_version") + lit(1)).otherwise(lit(2) * col("_commit_version")).alias("_commit_version"),
                when(col("_change_type") == lit("update_preimage"), lit("DELETE")).when(col("_change_type") == lit("update_postimage"), lit("INSERT")).otherwise(col("_change_type")).alias("operation"),
                col("_commit_timestamp")
            )
          )
display(df)


In [0]:
%sql
-- CREATE Bronze table
drop table poc_catalog.poc_demo_impetus.bronze_flow1;
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.bronze_flow1
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  (1, "User1",   "Oaxaca",      "INSERT", 1),
  (2, "User2",   "Oaxaca",      "INSERT", 1),
  (3, "User3",   "Oaxaca",      "INSERT", 1),
  (4, "User4",   "Oaxaca",      "INSERT", 1)
);

drop table poc_catalog.poc_demo_impetus.bronze_flow2;
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.bronze_flow2
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  (3, "User3_2", "Oaxaca",      "UPDATE", 2),
  (4, "User4_2", "Oaxaca",      "UPDATE", 2),
  (5, "User5", "Oaxaca",      "INSERT", 2),
  (6, "User6", "Oaxaca",      "INSERT", 2)
);

drop table poc_catalog.poc_demo_impetus.bronze_flow3;
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.bronze_flow3
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
    (5, "User5_2", "Oaxaca",   "UPDATE", 3),
    (6, "User6_2", "Oaxaca",   "UPDATE", 3),
    (7, "User7",   "Oaxaca",   "INSERT", 3),
    (8, "User8",   "Oaxaca",   "INSERT", 3)
);

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.bronze_flow1;
select * from  poc_catalog.poc_demo_impetus.bronze_flow2;
select * from  poc_catalog.poc_demo_impetus.bronze_flow3;


In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.silver_flows;


In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.gold_users_override;

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.gold_users_append;

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.gold_users_override2;

In [0]:
%sql
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.input_data_table
TBLPROPERTIES ('delta.enableChangeDataFeed'='true')
AS SELECT
  col1 AS id,
  col2 AS name,
  col3 AS city
FROM (
  VALUES
  (1, "U1", "City1"),
  (2, "U2", "City2"),
  (3, "U3", "City1"),
  (4, "U4", "City2")
);



In [0]:
%sql
DESCRIBE HISTORY poc_catalog.poc_schema.input_data_table

In [0]:
%sql


In [0]:
%sql
truncate Table poc_catalog.poc_schema.input_data_table;
insert into poc_catalog.poc_schema.input_data_table
 SELECT
  col1 AS id,
  col2 AS name,
  col3 AS city
FROM (
  VALUES
  (5, "U5", "City1"),
  (6, "U6", "City2"),
  (7, "U7", "City1"),
  (8, "U8", "City2"),
  (9, "U9", "City2"),
  (10, "U10", "City2")
);
select * from  poc_catalog.poc_schema.input_data_table;

In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .option("startingVersion", 4)
            .table("poc_catalog.poc_demo_impetus.input_data_table2")
            .select(
                col("id"), col("name"), col("city"),
                when(col("_change_type") == lit("update_postimage"), lit(2) * col("_commit_version") + lit(1)).otherwise(lit(2) * col("_commit_version")).alias("_commit_version"),
                when(col("_change_type") == lit("update_preimage"), lit("DELETE")).when(col("_change_type") == lit("update_postimage"), lit("INSERT")).otherwise(col("_change_type")).alias("operation"),
                col("_commit_timestamp")
            )
          )
display(df)


In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .option("startingVersion", 0)
            .table("poc_catalog.poc_demo_impetus.intermediate_update_table")
          )
display(df)

In [0]:
%sql
CREATE OR REPLACE TABLE poc_catalog.poc_demo_impetus.temp
AS SELECT
  col1 AS streaming_load_dt
FROM (
  VALUES
  (current_timestamp)
);

In [0]:
%sql
select * from  poc_catalog.poc_schema.input_data_table

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.input_data_table2

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.input_data_table

In [0]:
%sql
select id, name, city from  poc_catalog.poc_schema.output_data_table

In [0]:
%sql
select * from  poc_catalog.poc_demo_impetus.output_data_table

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.functions import col 
# Step 2.1
df1 =  spark.readStream.format("delta").table("poc_catalog.poc_demo_impetus.temp")
df1 = df1.drop("id").withColumn("operation", lit("TRUNCATE"))
df1 = df1.withColumn("meta_load_ts", unix_timestamp(col("meta_load_dt")))


# Step 2.2
df2 =  spark.readStream.format("delta").table("poc_catalog.poc_demo_impetus.input_data_table")
df2 = df2.withColumn("operation", lit("INSERT"))
df2 = df2.withColumn("meta_load_dt", current_timestamp()) \
    .withColumn("meta_load_ts", unix_timestamp(col("meta_load_dt")))

# Step 2.3
df = df1.unionByName(df2, allowMissingColumns=True)

df = df.withColumn("meta_sha2_id", sha2(concat_ws("||", *df.columns), 256))
display(df)


In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)

from pyspark.sql.functions import *
from pyspark.sql.functions import col

def get_table_starting_version(table_name):
    query = """
        SELECT MAX(version) as max_version 
        FROM (
            DESCRIBE HISTORY {table_name}
        ) WHERE operation='WRITE' or (operation='STREAMING UPDATE' and operationParameters.outputMode = 'Append') or operation='CREATE OR REPLACE TABLE AS SELECT' or operation='REPLACE TABLE AS SELECT'
    """.format(table_name=table_name)

    df_max_version = spark.sql(query)
    max_version_rows = df_max_version.collect()
    max_version = 1
    for max_version_row in max_version_rows:
        max_version = max_version_row["max_version"]
    return max_version

input = "poc_catalog.poc_demo_impetus.input_data_table2"
max_version = get_table_starting_version(input)
print(max_version)

spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
reader_config_options = {
    "readChangeFeed": "true",
    "startingVersion": max_version
}

df1 = spark.readStream.format("rate-micro-batch").option("rowsPerBatch", 1).load()
df1 = df1.drop("timestamp").drop("value").withColumn("operation", lit("TRUNCATE"))\
    .withColumn("streaming_load_dt", current_timestamp()) \
    .withColumn("streaming_load_ts", unix_timestamp(col("streaming_load_dt")))

# Step 2.2
df2 =  spark.readStream.options(**reader_config_options).table(input).filter("_change_type == 'insert'").withColumn("operation", lit("INSERT"))

df2 = df2.withColumn("streaming_load_dt", current_timestamp()) \
    .withColumn("streaming_load_ts", unix_timestamp(col("streaming_load_dt"))+1)

# Step 2.3
df = df2.unionByName(df1, allowMissingColumns=True)
df = df.withColumn("streaming_sha2_id", sha2(concat_ws("||", *df.columns), 256))
display(df)






In [0]:
%sql
CREATE OR REPLACE TABLE poc_catalog.poc_schema.source_data_load
AS SELECT
  col1 AS id,
  col2 AS name,
  col3 AS address,
  col4 AS operation,
  col5 AS version
FROM (
  VALUES
    -- initial data
    (1, "User1", "address1",   "INSERT", 1),
    (2, "User2", "address2",   "INSERT", 1),
    (3, "User3", "address3",   "INSERT", 1),
    -- some update and delete
    (2, "User2", "address2",   "DELETE", 2),
    (1, "User1", "address1_1",   "UPDATE", 2),
    -- add more data
    (4, "User4", "address4",   "INSERT", 3)
);
select * from poc_catalog.poc_schema.source_data_load;

In [0]:
spark.conf.set("spark.databricks.sql.streamingTable.cdf.applyChanges.returnPhysicalCdf", True)
from pyspark.sql.functions import *
df =  (spark
            .readStream
            .option("readChangeFeed", "true")
            .option("startingVersion", 2)
            .table("poc_catalog.poc_demo_impetus.final_update_table")
          )
display(df)

In [0]:
%sql
select * from poc_catalog.poc_demo_impetus.final_update_table;

In [0]:
%sql
-- SQL1 : version 0 
INSERT INTO poc_catalog.poc_schema.my_streaming_table
AS SELECT
  c1 AS id,
  c2 AS name,
  c3 AS address
FROM (
  VALUES
    -- initial data
    (1, "User1", "address1"),
    (2, "User2", "address2"),
    (3, "User3", "address3")
);
-- SQL2 : version 1
DELETE FROM poc_catalog.poc_schema.my_streaming_table WHERE id = 2;

-- SQL3 : version 2
UPDATE poc_catalog.poc_schema.my_streaming_table
SET  address  = 'address1_1'
WHERE id =1;

-- SQL4 :version 3
INSERT INTO poc_catalog.poc_schema.my_streaming_table
AS SELECT
  c1 AS id,
  c2 AS name,
  c3 AS address
FROM (
  VALUES
    (4, "User4", "address4")
);

