In [70]:
change_file = "20230607084651_CREWSSSP_WORK_REQUEST.txt"
init_path = "data_swamp/incoming/crews/init/"
dest_table_name = "CREWSSSP_WORK_REQUEST"
schema_file = "20230607084651_schema.ini"
init_cdc = 6275191283894
changes_path = "/data_swamp/incoming/crews/new_changes/"
config_name = "crews"
table_path = "data_lake/bronze/crews/"
primary_key = "WR_NO"
partition_by = "ENTRY_DATE_YEAR=YEAR(CAST(ENTRY_DATE* AS DATE)), ENTRY_DATE_MONTH=MONTH(CAST(ENTRY_DATE* AS DATE))"

StatementMeta(fetrspker01, 91, 2, Finished, Available)

In [None]:
%%pyspark

from pyspark.sql.functions import *
import pandas as pd
import configparser
import os
from delta.tables import *
import json
from datetime import datetime

config = config_name
base_abfss_path = "abfss://enterprise-reporting@fetrsaentreporting.dfs.core.windows.net"
keys = primary_key.split(",")
table_name = dest_table_name

if schema_file:
    schema_file = os.path.join(base_abfss_path, changes_path, schema_file) 

if change_file:
    change_file = os.path.join(base_abfss_path, changes_path, change_file) 

delta_table_path = os.path.join(base_abfss_path, table_path, table_name) 
cdc_file = os.path.join(delta_table_path, "_cdc", f"{table_name}_CDC_DONOTREMOVE.txt")
tmp_area = os.path.join(base_abfss_path, "data_swamp/temp/", table_name, datetime.now().strftime("%Y%m%d%H%M%S")) 

print(f"tmp_area is {tmp_area}")

if not config:
    raise Exception("No config name passed in.  Cannot continue without a config_name")

if (len(keys)<=0):
    raise Exception("Table must have a primary_key passed in. If muliple columns delimit by comma.")

if not table_name:
    raise Exception("dest_table_name must be passed in.")

if not table_path:
    raise Exception("table_path must be passed in.")

current_cdc = 0

if mssparkutils.fs.exists(cdc_file):
    print(f"cdc file found {cdc_file}")
    cdc_df = pd.read_csv(cdc_file)
    current_cdc = int(cdc_df['current_cdc'].iloc[0])
    
    # if init_cdc is larger than current_cdc then initializeLHContext
    if current_cdc<init_cdc: 
        init_flag = True
    else:
        init_flag = False
else:
    print(f"no cdc file found {cdc_file}...")
    cdc_df = pd.DataFrame({'init_cdc':[init_cdc], 'current_cdc':[init_cdc]})
    init_flag = True

if current_cdc<=0: 
    current_cdc = init_cdc

print(f"init_cdc is {init_cdc}")
print(f"current_cdc is {current_cdc}")
display(cdc_df)

if init_path and init_cdc==current_cdc and init_flag:
    init_data_path = os.path.join(base_abfss_path, init_path, f"{table_name}_{init_cdc}.parquet")
    
    if not mssparkutils.fs.exists(init_data_path):
        raise Exception(f"Init path {init_data_path} does not exist. Cannot initialize table as there is no source file to load from.")
else:
    init_data_path = ""

print(f"init_data_path is {init_data_path}")
print(f"init_flag is {init_flag}")

# READ SCHEMA FILE TO GET DEFINITION OF PASSED IN CONFIG TABLE
if not mssparkutils.fs.exists(change_file) and change_file:
    raise Exception(f"Change file path {change_file} does not exist. Cannot process table as there is no source of changes to load from.")

if not mssparkutils.fs.exists(schema_file) and schema_file:
    raise Exception(f"Change schema file {schema_file} does not exist. Cannot process table as there is schema to distinguish from.")

if not schema_file:
    print("no schema file passed in... will skip cdc process")
    mssparkutils.notebook.exit(0) 

config = configparser.ConfigParser()

#get config text into string
schema_file_df = spark.read.text(schema_file)
schema_file_txt = '\n'.join(d['value'] for d in schema_file_df.collect())

config.read_string(schema_file_txt)
#print(config.sections())

headers = "False"
cols = []
types = []

for key, value in config.items(f"{table_name}.txt"):  
    #print(f"{key}, {value}")

    if key == "colnameheader":
        headers = value
    elif key.startswith("col"):
        # remove data type from the end by splitting on space... hopeful column names in IDR don't have spaces??
        cols.append(value.split(" ")) 
    
columns_df = spark.createDataFrame(data=cols, schema = ["col_name","col_type"])
columns_df.show(20)

if (columns_df.count()<=0):
    raise Exception(f"No schema was found in schema file {schema_file}. Cannot continue.")

# USE COLUMNS FROM ABOVE TO GET COLUMNS ON CHANGE DATA FILE PASSED IN 
df = spark.read.format('csv') \
                .option('header',False) \
                .option('sep', ';') \
                .option("multiLine",True)  \
                .option("escape", "\"") \
                .load(change_file)
#display(df.limit(20))
cols=columns_df.select(columns_df.col_name).toPandas()['col_name']

print(len(df.columns))
print(len(cols))

diff = len(df.columns)-len(cols)

if (diff>0):
    for i in range(0,diff):
        print(f"dropping col index as schema doesn't match change file... dropping {len(df.columns)-(i+1)}")
        df = df.drop(df[len(df.columns)-(i+1)])

# rename columns
df = df.toDF(*list(cols))

df.createOrReplaceTempView(f"{table_name}_change_data")
display(df.limit(10))

# get list of columns to iterate over
tracked_cols = pd.DataFrame(cols)
tracked_cols = tracked_cols[tracked_cols.col_name.str.contains("_NEW")]
tracked_cols = pd.DataFrame(tracked_cols['col_name'].str.replace("_NEW",""))

# create delta tables partition by syntax and col list
if partition_by:
    partition_sql = ""
    partition_change_sql = ""
    partitionBy = []
    partitionBy_change = []

    # if partition_by is filled out split on comma delim... format for generated column is col_name=<and valid sql>
    for p in partition_by.split(","):
        if not p in tracked_cols['col_name'] and "=" in p:
            partition_sql += f"{p.split('=')[1].replace('*','').strip()} as {p.split('=')[0].strip()},"
            partition_change_sql += f",{p.split('=')[1].replace('*','_OLD').strip()} AS {p.split('=')[0].strip()}_OLD,{p.split('=')[1].replace('*','_NEW').strip()} AS {p.split('=')[0].strip()}_NEW"
            partitionBy.append(p.split("=")[0].strip())
        else:
            partitionBy.append(p.strip())

    # remove last ,
    partition_sql = partition_sql[:-1]

    print(f"partition_sql is {partition_sql}")
    print(f"partitionBy is {partitionBy}")
    print(f"partition_change_sql is {partition_change_sql}")

    tracked_partition_by_cols = pd.DataFrame(partitionBy, columns=["col_name"])
    tracked_cols = pd.concat([tracked_cols,tracked_partition_by_cols]).drop_duplicates().reset_index(drop=True)

#display(tracked_cols)

In [72]:
# IF INIT PATH and INIT FLAG SPECIFIED from logic above INITIALIZE DELTA TABLE
if len(init_data_path)>0 and init_flag:
    if mssparkutils.fs.exists(delta_table_path):
        print(f"Found {init_data_path} and init flag set... will remove contents.")        
        mssparkutils.fs.rm(delta_table_path, True)

    # read in init table load
    init_df = spark.read.load(init_data_path, format='parquet')

    # write it out as delta table to adls
    if partition_by:  # create partition by columns 
        init_df.createOrReplaceTempView(f"{table_name}_pre_partition")
        init_df=spark.sql(f"select {partition_sql},* from {table_name}_pre_partition")
        display(init_df.limit(10))
        print(partitionBy)
        init_df.write.format("delta").partitionBy(partitionBy).save(delta_table_path)
    else:
        display(init_df.limit(10))
        init_df.write.format("delta").save(delta_table_path)

    cdc_df.to_csv(cdc_file)
    print(f"Wrote table and cdc file to {delta_table_path}") 

    # catalog it
    spark.sql(f"DROP TABLE IF EXISTS {table_name};")
    spark.sql(f"CREATE TABLE {table_name} USING DELTA LOCATION '{delta_table_path}';")
    display(spark.sql(f"DESCRIBE EXTENDED {table_name}"))
else:
    print(f"Table has already been initialized at {delta_table_path}... will skip initialization.") 


StatementMeta(fetrspker01, 91, 4, Finished, Available)

Table has already been initialized at abfss://enterprise-reporting@fetrsaentreporting.dfs.core.windows.net/data_lake/bronze/crews/CREWSSSP_WORK_REQUEST... will skip initialization.


In [None]:
# AGGREGATE UPDATES THAT CAN BE (ON ISLAND ... BETWEEN I/D OPERATIONS) SO AS NOT TO HAVE TO LOOP IF 500 UPDATES FOR 1 ID ... 500 TIMES
col_sql = ""
key_sql = ""
part_sql = ""
update_last_sql = "" 
updateOp = ""
insertOpValues = ""
insertOpOrdinal = ""
joinOp = ""
part_sql_coalesced_merge  = ""
part_sql_self_merge = ""

if partition_by:
        key_sql += partition_change_sql

# create sql needed sql for each col
for idx, row in tracked_cols.iterrows():
        if row['col_name'] in keys:   
                key_sql += f", COALESCE({row['col_name']}_NEW, {row['col_name']}_OLD) as {row['col_name']}"
                joinOp += f"p.{row['col_name']} = u.{row['col_name']} AND"
                insertOpValues += f"{row['col_name']}," 
        elif row['col_name'] in partitionBy:
                part_sql += f", COALESCE({row['col_name']}_NEW, {row['col_name']}_OLD) as {row['col_name']}"
                part_sql_coalesced_merge += f"{row['col_name']}_OLD = COALESCE(u.{row['col_name']}_NEW, u.{row['col_name']}_OLD, p.{row['col_name']}),"
                part_sql_self_merge += f"{row['col_name']}_OLD = COALESCE(u.{row['col_name']}_OLD, p.{row['col_name']}_NEW),"
                insertOpValues += f"{row['col_name']},"
                updateOp += f"""{row['col_name']} =  coalesce(u.{row['col_name']},p.{row['col_name']})
                        ,"""
        else:
                col_sql += f", {row['col_name']}_OLD, {row['col_name']}_NEW "
                update_last_sql += f""", last({row['col_name']}_OLD, true) over (partition by OP_CODE, {','.join(keys)}, update_batch_order order by run_order ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS {row['col_name']}_OLD 
                        , nullif(last(if({row['col_name']}_OLD is not null and {row['col_name']}_NEW is null,'NULLm30ut',{row['col_name']}_NEW), true) over (partition by OP_CODE, {','.join(keys)}, update_batch_order order by run_order ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),'NULLm30ut') AS {row['col_name']}_NEW 
                        """
                updateOp += f"""{ row['col_name'] } = case when u.{row['col_name']}_NEW is NULL AND u.{row['col_name']}_OLD is NOT NULL THEN NULL ELSE coalesce(u.{row['col_name']}_NEW,p.{row['col_name']}) END
                                        ,"""
                insertOpValues += f"{row['col_name']}_NEW,"
                
        insertOpOrdinal += f"{row['col_name']},"

# remove last and or ,
joinOp = joinOp[:-3]
updateOp = updateOp[:-1]
insertOpValues = insertOpValues[:-1]
insertOpOrdinal = insertOpOrdinal[:-1]
part_sql_coalesced_merge = part_sql_coalesced_merge[:-1]
part_sql_self_merge = part_sql_self_merge[:-1]

#print(f"col_sql = {col_sql}")
#print()
#print(f"key_sql = {key_sql}")
#print()
#print(f"part_sql = {part_sql}")
#print()
#print(f"update_last_sql = {update_last_sql}")
#print()
#print(f"updateOp = {updateOp}")
#print()
#print(f"insertOpValues = {insertOpValues}")
#print()
#print(f"insertOpOrdinal = {insertOpOrdinal}")
#print()
#print(f"joinOp = {joinOp}")
#print()


deltaTable_df = DeltaTable.forPath(spark, delta_table_path).toDF()
deltaTable_df.createOrReplaceTempView(f"{table_name}_tmp")

# create base view to be used to agg update statements on same "island" and needed rows for deletes and inserts
q = f"""with u as (select OP_CODE, cast(OP_CMT_SCN as BIGINT) OP_CMT_SCN, CAST(OP_NUM_IN_TX AS BIGINT) OP_NUM_IN_TX
        , MIN(cast(OP_CMT_SCN as BIGINT)) over (PARTITION BY NULL) MIN_SCN
        , MAX(cast(OP_CMT_SCN as BIGINT)) over (PARTITION BY NULL) MAX_SCN
        {key_sql}
        {col_sql}
        from {table_name}_change_data              
        where CAST(OP_CMT_SCN AS BIGINT)>{current_cdc}
        )
        select u.* 
        , row_number() over (partition by {','.join(keys)} order by CAST(OP_NUM_IN_TX AS BIGINT)) run_order
        , case when OP_CODE='U' then dense_rank() over (partition by OP_CODE,{','.join(keys)} order by CAST(OP_NUM_IN_TX AS BIGINT)) end update_order
        from u
        ORDER BY {','.join(keys)}, CAST(OP_NUM_IN_TX AS BIGINT)
"""

#print(q)

if mssparkutils.fs.exists(tmp_area):
        print(f"Found {tmp_area}... will remove contents.")        
        mssparkutils.fs.rm(tmp_area, True)

df_base_changes = spark.sql(q)
df_base_changes.write.format("delta").save(tmp_area)
df_base_changes = DeltaTable.forPath(spark, tmp_area).toDF()
df_base_changes.createOrReplaceTempView(f"{table_name}_base_change_data")

# update partition columns as they may be empty in updates if not changed in source system.. only need to set _OLD column as will be coalesced down the line
# without this duplication will occur for ops that are updates... 
q = f"""MERGE INTO {table_name}_base_change_data u
        USING {table_name} p ON {joinOp}   
        WHEN MATCHED THEN UPDATE set {part_sql_coalesced_merge}
"""

#print(q)

df_base_changes = spark.sql(q)

# update partition columns from their previous insert in same change file
q = f"""MERGE INTO {table_name}_base_change_data u
        USING (select * from {table_name}_base_change_data where OP_CODE='I') p ON {joinOp}   
        WHEN MATCHED THEN UPDATE set {part_sql_self_merge}
"""

#print(q)

df_base_changes = spark.sql(q)

q = f"""with a as (select *, run_order - update_order as seq_id from {table_name}_base_change_data where OP_CODE='U')
        , b as (
        select *, dense_rank() over (partition by OP_CODE, {','.join(keys)} order by seq_id) AS update_batch_order 
        from a
        )
        select DISTINCT OP_CODE, MIN_SCN, MAX_SCN, {','.join(keys)} 
        {part_sql}
        {update_last_sql} 
        , min(run_order) over (partition by OP_CODE, {','.join(keys)}, update_batch_order) AS run_order 
        from b
        union all
        select DISTINCT OP_CODE, MIN_SCN, MAX_SCN, {','.join(keys)}
        {part_sql}
        {col_sql}
        , run_order
        from {table_name}_base_change_data 
        where OP_CODE<>'U'
        order by {','.join(keys)}, run_order
"""

#print(q)

agg_changes_df = spark.sql(q)
agg_changes_df.collect()

In [None]:
# CREATE TEMP VIEW AND USE SUMMARY STATS
if agg_changes_df.count()>0:
    agg_changes_df.createOrReplaceTempView(f"{table_name}_agg_change_data")
    #display(spark.sql(f"select * from {table_name}_change_data where COALESCE(WR_NO_NEW,WR_NO_OLD)=62618435 order by OP_NUM_IN_TX"))
    #display(spark.sql(f"select * from {table_name}_base_change_data where WR_NO=62618435 order by OP_NUM_IN_TX"))
    #display(spark.sql(f"select * from {table_name}_agg_change_data where WR_NO=62618435 order by run_order"))
    spark.sql(f"select OP_CODE, run_order, count(*) from {table_name}_agg_change_data group by OP_CODE, run_order").show()
    spark.sql(f"select OP_CODE, run_order, {','.join(keys)}, count(*) from {table_name}_agg_change_data group by OP_CODE, run_order, {','.join(keys)} having count(*)>1").show()

In [96]:
# RUN THE MERGE (LOOP THROUGH RUN_ORDER TO APPLY CUMULATIVE CHANGES) IF DATA 

from delta.tables import *
import json

if agg_changes_df.count()>0:
    deltaTable_df = DeltaTable.forPath(spark, delta_table_path).toDF()
    deltaTable_df.createOrReplaceTempView(f"{table_name}_tmp")
    max_run_order = int(spark.sql(f"select MAX(CAST(run_order AS BIGINT)) from {table_name}_agg_change_data").first()[0])
    run = 1
    
    while run<=max_run_order:     
        print(f"Running {run} of {max_run_order}...")  
        mergeSql = f"""MERGE INTO {f"{table_name}_tmp"} AS p
                            USING ({f"select * from {table_name}_agg_change_data where run_order={run}"}) AS u
                            ON {joinOp}
                            WHEN MATCHED AND u.OP_CODE='U' THEN
                            UPDATE SET
                                {updateOp}
                            WHEN MATCHED AND u.OP_CODE='D' THEN DELETE
                            WHEN NOT MATCHED AND u.OP_CODE='I'
                            THEN INSERT (
                                {insertOpOrdinal}
                            )
                            VALUES (
                                {insertOpValues}
                            )
                            
        """
        #print(mergeSql)
        spark.sql(mergeSql)
        #spark.sql(f"select * from {table_name}_agg_change_data where run_order={run}").show(10)
        #spark.sql(f"select * from {table_name}_tmp").show(10)
        run += 1

    new_cdc = int(spark.sql(f"select MAX(CAST(MAX_SCN AS BIGINT)) from {table_name}_agg_change_data").first()[0])

    if new_cdc>current_cdc:   
      cdc_df = pd.DataFrame({'init_cdc':[init_cdc], 'current_cdc':[new_cdc]})
      cdc_df.to_csv(cdc_file)
      print(f"Wrote new cdc file with new current_cdc {new_cdc}")
else:
  print("no changes to apply")

StatementMeta(fetrspker01, 91, 28, Finished, Available)

Running 1 of 3...
Running 2 of 3...
Running 3 of 3...
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fb4e2b7e940>
Wrote new cdc file with new current_cdc 6276067685761
