In [0]:
# - - - - - - - - - - -
# Inject from DLT Job
# - - - - - - - - - - -
try:
  spark.conf.get("kdiPipeline.KAFKA_BROKER_ADDRESS")
  spark.conf.get("kdiPipeline.KAFKA_CONSUMER_NAME_SELF")
  spark.conf.get("kdiPipeline.KAFKA_TOPIC_LIST")
  spark.conf.get("kdiPipeline.PRIMARY_KEY_LIST")
except:
  spark.conf.set("kdiPipeline.KAFKA_BROKER_ADDRESS", "kafka:9092")
  spark.conf.set("kdiPipeline.KAFKA_CONSUMER_NAME_SELF", "kdi-java-1")
  spark.conf.set("kdiPipeline.KAFKA_TOPIC_LIST", "server1.dbo.customers|server1.dbo.products_on_hand")
  spark.conf.set("kdiPipeline.PRIMARY_KEY_LIST", "id|product_id")

# - - - - -
# Injected
# - - - - -
# Single
KAFKA_BROKER_ADDRESS = spark.conf.get("kdiPipeline.KAFKA_BROKER_ADDRESS")
KAFKA_CONSUMER_NAME_SELF = spark.conf.get("kdiPipeline.KAFKA_CONSUMER_NAME_SELF")

# List
KAFKA_TOPIC_LIST = spark.conf.get("kdiPipeline.KAFKA_TOPIC_LIST").split("|")
PRIMARY_KEY_LIST = spark.conf.get("kdiPipeline.PRIMARY_KEY_LIST").split("|")

In [0]:
import dlt
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.context import SparkContext

def generate_silver(
  kafka_topic, kafka_broker_address, kafka_consumer_name_self,
  cdc_staging_table, bronze_table, upsert_staging_view, silver_table, primary_keys
):
  # - - - - - - - - - - - - 
  # Table scoped variables
  # - - - - - - - - - - - - 
  cdc_raw_delta_path = f"/mnt/cdckafka/{kafka_broker_address}/{kafka_topic}/kdi={kafka_consumer_name_self}"
                    
  # - - - - - - - - - - - - - - - - - - - - - - - - - -
  # Read JSON Payload -> Dictionary -> List -> Schema
  # - - - - - - - - - - - - - - - - - - - - - - - - - -
  with open(f"/dbfs/mnt/cdckafka/kafka_payloads/{kafka_topic}.json", 'r') as f:
      jsonString = f.read()
  Dict = json.loads(jsonString)
  jsonData = json.dumps(Dict)
  jsonDataList = []
  jsonDataList.append(jsonData)

  # Read List into Dataframe
  sc = spark.sparkContext # Pull out Sparkcontext
  jsonRDD = sc.parallelize(jsonDataList)
  df = spark.read.json(jsonRDD)
  schema = df.schema

  # - - - - - - - - - - - - 
  # Ingest raw CDC data
  # - - - - - - - - - - - - 
  @dlt.create_table(
    name=cdc_staging_table,
    comment="The raw Debezium logs from Kafka fed via KDI Container."
  )
  def cdc_raw():          
    return (
      spark.readStream.format("delta").load(cdc_raw_delta_path)
    )

  # - - - - - - - - - - - - - - - - - - - - 
  # Exploded JSON with inferred Schema
  # - - - - - - - - - - - - - - - - - - - - 
  @dlt.table(
    name=bronze_table,
    comment="Exploded JSON with Schema inference."
  )
  def create_bronze_table():
    return (
      dlt.read_stream(cdc_staging_table)
        .select("*", from_json(col("value"), schema).alias("data")) 
        .select("*", "data.*")
        .withColumn("SourceModifiedDate", from_unixtime((col("payload.ts_ms") / 1000)))
        .select("SourceModifiedDate", "payload.*")
        .filter("op is not null")
    )

  # - - - - - - - - -
  # Staging View
  # - - - - - - - - -
  @dlt.view(
    name=upsert_staging_view,
    comment="Forming a staging view ready for UPSERT."
  )
  def create_upsert_stage_view():
      return (
        dlt.read_stream(bronze_table)
          .withColumn("result", when(col("op") == "d", col("before"))
                               .otherwise(col("after")))
          .select("SourceModifiedDate", "op", "result.*")
      )

  # - - - - - - - - -
  # SCD 1 to Silver
  # - - - - - - - - -
  dlt.create_target_table(silver_table)

  dlt.apply_changes(
    target = silver_table,
    source = upsert_staging_view,
    keys = primary_keys,
    sequence_by = col("SourceModifiedDate"),
    apply_as_deletes = expr("op = 'd'"),
    except_column_list = ["op", "SourceModifiedDate"]
  )

In [0]:
# Loop over for all tables
for i in range(len(KAFKA_TOPIC_LIST)):
  # - - - - - - - - - - - - 
  # Table scoped variables
  # - - - - - - - - - - - - 
  TABLE_NAME = KAFKA_TOPIC_LIST[i].replace(".", "_")
  CDC_STAGING_TABLE = f"CDC_{TABLE_NAME}"
  BRONZE_TABLE = f"BRONZE_{TABLE_NAME}"
  UPSERT_STAGING_VIEW = f"UPSERT_{TABLE_NAME}"
  SILVER_TABLE = f"SILVER_{TABLE_NAME}"
  
  # We need to pass in a list as a Primary key
  LIST = [None] * 1 # <- We assume to start a table has one unique PK
  LIST[0] = PRIMARY_KEY_LIST[i]
  
  # - - - - - - - - - - - - - -
  # Call DLT Pipeline per table
  # - - - - - - - - - - - - - -
  generate_silver(
    kafka_topic=KAFKA_TOPIC_LIST[i], 
    kafka_broker_address=KAFKA_BROKER_ADDRESS, 
    kafka_consumer_name_self=KAFKA_CONSUMER_NAME_SELF,
    cdc_staging_table=CDC_STAGING_TABLE, 
    bronze_table=BRONZE_TABLE, 
    upsert_staging_view=UPSERT_STAGING_VIEW, 
    silver_table=SILVER_TABLE, 
    primary_keys=LIST
  )