In [0]:
def func(df, batchId):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number, trim, current_date
    from delta.tables import DeltaTable

    # Deduplicate by cst_id keeping the latest record
    df = (
        df.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("cst_id").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('cst_id').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    df.createOrReplaceTempView('cust_info')

    # Transform data
    df = spark.sql('''
        SELECT 
            cst_id,
            cst_key,
            TRIM(cst_firstname) AS cst_firstname,
            TRIM(cst_lastname) AS cst_lastname,
            CASE 
                WHEN cst_marital_status = "M" THEN "Married"
                WHEN cst_marital_status = "S" THEN "Single"
                ELSE "N/A"
            END AS cst_marital_status,
            CASE  
                WHEN cst_gndr = "M" THEN "Male"
                WHEN cst_gndr = "F" THEN "Female"
                ELSE "N/A"
            END AS cst_gender,
            cst_create_date,
            current_date() as modified_date
        FROM cust_info
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/cust_info/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        df.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                df.alias('src'),
                'trg.cst_id = src.cst_id'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


df1 = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_info/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/cust_info/')


df1.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_info/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_info/data/')\
    .trigger(once=True) \
    .start()

In [0]:
def func(df, batchId):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number, trim
    from delta.tables import DeltaTable

    # Deduplicate by cid keeping the latest record
    df = (
        df.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("cid").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('cid').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    df.createOrReplaceTempView('cust_az12')

    # Transform data
    df = spark.sql('''
        SELECT
			CASE
				WHEN cid LIKE 'NAS%' THEN SUBSTRING(cid, 4, LEN(cid)) -- Remove 'NAS' prefix if present
				ELSE cid
			END AS cid, 
			CASE
				WHEN bdate > current_date() THEN NULL
				ELSE bdate
			END AS bdate, -- Set future birthdates to NULL
			CASE
				WHEN UPPER(TRIM(gen)) IN ('F', 'FEMALE') THEN 'Female'
				WHEN UPPER(TRIM(gen)) IN ('M', 'MALE') THEN 'Male'
				ELSE 'n/a'
			END AS gen,
            current_date() as modified_date
        FROM cust_az12
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/cust_az12/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        df.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                df.alias('src'),
                'trg.cid = src.cid'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


cust_az12 = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_az12/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/CUST_AZ12/')


cust_az12.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_az12/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/cust_az12/data/')\
    .trigger(once=True) \
    .start()



In [0]:
def func(df, batchId):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number, trim
    from delta.tables import DeltaTable

    # Deduplicate by cid keeping the latest record
    df = (
        df.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("cid").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('cid').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    df.createOrReplaceTempView('loc_a101')

    # Transform data
    df = spark.sql('''
        SELECT
			REPLACE(cid, '-', '') AS cid, 
			CASE
				WHEN TRIM(cntry) = 'DE' THEN 'Germany'
				WHEN TRIM(cntry) IN ('US', 'USA') THEN 'United States'
				WHEN TRIM(cntry) = '' OR cntry IS NULL THEN 'n/a'
				ELSE TRIM(cntry)
			END AS cntry,
            current_date() as modified_date
        FROM loc_a101
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/loc_a101/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        df.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                df.alias('src'),
                'trg.cid = src.cid'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


loc_a101 = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/loc_a101/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/LOC_A101/')


loc_a101.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/loc_a101/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/loc_a101/data/')\
    .trigger(once=True) \
    .start()



In [0]:
def func(df, batchId):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number, trim
    from delta.tables import DeltaTable

    # Deduplicate by prd_id keeping the latest record
    df = (
        df.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("prd_id").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('prd_id').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    df.createOrReplaceTempView('prd_info')

    # Transform data
    df = spark.sql('''
        SELECT
			prd_id,
			REPLACE(SUBSTRING(prd_key, 1, 5), '-', '_') AS cat_id, -- Extract category ID
			SUBSTRING(prd_key, 7, LENGTH(prd_key)) AS prd_key,        -- Extract product key
			prd_nm,
			COALESCE(prd_cost, 0) AS prd_cost,
			CASE 
				WHEN UPPER(TRIM(prd_line)) = 'M' THEN 'Mountain'
				WHEN UPPER(TRIM(prd_line)) = 'R' THEN 'Road'
				WHEN UPPER(TRIM(prd_line)) = 'S' THEN 'Other Sales'
				WHEN UPPER(TRIM(prd_line)) = 'T' THEN 'Touring'
				ELSE 'n/a'
			END AS prd_line, -- Map product line codes to descriptive values
			CAST(prd_start_dt AS DATE) AS prd_start_dt,
			CAST(
				LEAD(prd_start_dt) OVER (PARTITION BY prd_key ORDER BY prd_start_dt) - 1 
				AS DATE
			) AS prd_end_dt,
            current_date() as modified_date
        FROM prd_info
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/prd_info/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        df.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                df.alias('src'),
                'trg.prd_id = src.prd_id'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


prd_info = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/prd_info/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/prd_info/')


prd_info.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/prd_info/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/prd_info/data/')\
    .trigger(once=True) \
    .start()



In [0]:
def func(df, batchId):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number, trim
    from delta.tables import DeltaTable

    # Deduplicate by id keeping the latest record
    df = (
        df.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("id").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('id').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    df.createOrReplaceTempView('px_cat_g1v2')

    # Transform data
    df = spark.sql('''
        SELECT
			id,
			cat,
			subcat,
			maintenance,
            current_date() as modified_date
        FROM px_cat_g1v2
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/px_cat_g1v2/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        df.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                df.alias('src'),
                'trg.id = src.id'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


px_cat_g1v2 = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/px_cat_g1v2/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/PX_CAT_G1V2/')


px_cat_g1v2.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/px_cat_g1v2/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/px_cat_g1v2/data/')\
    .trigger(once=True) \
    .start()



In [0]:
def func(sales_details, batchsls_ord_num):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import col, row_number
    from delta.tables import DeltaTable

    # Deduplicate by sls_ord_num keeping the latest record
    sales_details = (
        sales_details.withColumn(
            'dedup',
            row_number().over(Window.partitionBy("sls_ord_num").orderBy(col("updated_at").desc()))
        )
        .filter((col('dedup') == 1) & col('sls_ord_num').isNotNull())
        .drop('dedup')
    )

    # Create a temporary view for SQL transformation
    sales_details.createOrReplaceTempView('sales_details')

    # Transform data
    sales_details = spark.sql('''
        SELECT 
            sls_ord_num,
            sls_prd_key,
            sls_cust_id,

            CASE 
                WHEN sls_order_dt = 0 OR LENGTH(sls_order_dt) != 8 THEN NULL
                ELSE TO_DATE(CAST(sls_order_dt AS STRING), 'yyyyMMdd')
            END AS sls_order_dt,

            CASE 
                WHEN sls_ship_dt = 0 OR LENGTH(sls_ship_dt) != 8 THEN NULL
                ELSE TO_DATE(CAST(sls_ship_dt AS STRING), 'yyyyMMdd')
            END AS sls_ship_dt,

            CASE 
                WHEN sls_due_dt = 0 OR LENGTH(sls_due_dt) != 8 THEN NULL
                ELSE TO_DATE(CAST(sls_due_dt AS STRING), 'yyyyMMdd')
            END AS sls_due_dt,

            CASE 
                WHEN sls_sales IS NULL 
                    OR sls_sales <= 0 
                    OR sls_sales != sls_quantity * ABS(sls_price)
                THEN sls_quantity * ABS(sls_price)
                ELSE sls_sales
            END AS sls_sales,

            sls_quantity,
            ROUND(
                CASE 
                    WHEN sls_price IS NULL OR sls_price <= 0 THEN 
                        CASE WHEN sls_quantity != 0 THEN sls_sales / sls_quantity ELSE NULL END
                    ELSE sls_price
                END
            , 2) AS sls_price,
            current_date() as modified_date

        FROM sales_details
    ''')

    # Define Delta path
    delta_path = 'abfss://silver@storagetemp.dfs.core.windows.net/sales_details/data/'

    # Write or merge data into Delta table
    if not DeltaTable.isDeltaTable(spark, delta_path):
        # First-time write
        sales_details.write.format('delta').mode('overwrite').save(delta_path)
    else:
        # Perform upsert (merge)
        trg = DeltaTable.forPath(spark, delta_path)

        (
            trg.alias('trg')
            .merge(
                sales_details.alias('src'),
                'trg.sls_ord_num = src.sls_ord_num'
            )
            .whenMatchedUpdateAll(condition='trg.updated_at < src.updated_at')
            .whenNotMatchedInsertAll()
            .execute()
        )


sales_details = spark.readStream.format('cloudFiles') \
    .option('cloudFiles.format', 'parquet') \
    .option('cloudFiles.schemaLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/sales_details/checkpoint') \
    .option('cloudFiles.schemaEvolutionMode', 'rescue') \
    .load('abfss://bronze@storagetemp.dfs.core.windows.net/sales_details/')


sales_details.writeStream.foreachBatch(func) \
    .outputMode('append') \
    .option('checkpointLocation', 'abfss://silver@storagetemp.dfs.core.windows.net/sales_details/checkpoint') \
    .option('path', 'abfss://silver@storagetemp.dfs.core.windows.net/sales_details/data/')\
    .trigger(once=True) \
    .start()