In [0]:
import sys
import os
sys.path.append(os.path.abspath('../utility'))

from pyspark.sql.functions import col, lit, when
from delta.tables import DeltaTable
from datetime import datetime, date
from log_process import init_run, finalize_run

In [0]:

def clean_products(df):

    df_check_datatype = df.withColumn("valid_datatype",
        when(                                        
            (col("Price").isNotNull() & col("Price").cast("decimal(10,2)").isNull())
            |
            (col("StockQuantity").isNotNull() & col("StockQuantity").cast("integer").isNull())
            |
            (col("DateAdded").isNotNull() & col("DateAdded").cast("date").isNull())
            , "N"
        )
        .otherwise("Y")
    )

    df_invalid_datatype = df_check_datatype.filter(col("valid_datatype") == "N")

    reject_base_filepath = "abfss://testcontainer@stvptest001.dfs.core.windows.net/rejects"
    reject_base_filename = "products"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    reject_file = f"{reject_base_filepath}/{reject_base_filename}_{timestamp}.csv" 

    df_invalid_datatype.coalesce(1).write.csv(reject_file, header=True)

    return (
        df_check_datatype.filter(col("valid_datatype") == "Y")
        .withColumn("Price", col("Price").cast("decimal(10,2)"))
        .withColumn("StockQuantity", col("StockQuantity").cast("integer"))
        .withColumn("DateAdded", col("DateAdded").cast("date"))
        .drop("valid_datatype")
        )

In [0]:
%skip
delta_table_dim_products = DeltaTable.forName(spark, "silver.dim_products")

delta_table_dim_products.alias("dim_products") \
    .merge(
        df_cleaned_products.alias("stg_products"),
        "dim_products.ProductID = stg_products.ProductID"
    ) \
    .whenMatchedUpdate(
        condition = """dim_products.ProductName != stg_products.ProductName
            OR dim_products.Category != stg_products.Category
            OR dim_products.Description != stg_products.Description
            OR dim_products.Price != stg_products.Price
            OR dim_products.StockQuantity != stg_products.StockQuantity
            OR dim_products.Supplier != stg_products.Supplier
            OR dim_products.DateAdded != stg_products.DateAdded
            OR dim_products.Status != stg_products.Status"""
        , set = {
            "ProductName": "stg_products.ProductName",
            "Category": "stg_products.Category",
            "Description": "stg_products.Description",
            "Price": "stg_products.Price",
            "StockQuantity": "stg_products.StockQuantity",
            "Supplier": "stg_products.Supplier",
            "DateAdded": "stg_products.DateAdded",
            "Status": "stg_products.Status",
            "UpdateRunID": lit(v_run_id)
        }
    ) \
    .whenNotMatchedInsert(
        values = {
            "ProductID": "stg_products.ProductID",
            "ProductName": "stg_products.ProductName",
            "Category": "stg_products.Category",
            "Description": "stg_products.Description",
            "Price": "stg_products.Price",
            "StockQuantity": "stg_products.StockQuantity",
            "Supplier": "stg_products.Supplier",
            "DateAdded": "stg_products.DateAdded",
            "Status": "stg_products.Status",
            "ValidFromDate": lit(v_valid_from_dttm),
            "ValidToDate": lit(v_max_valid_to_dttm),
            "ActiveInd": lit("Y"),
            "ValidInd": lit("Y"),
            "InsertRunID": lit(v_run_id)
        } 
    ) \
    .execute()

In [0]:
def scd_type2(df_src, trg_table_name, params):

    delta_table_dim_products = DeltaTable.forName(spark, trg_table_name)
    df_dim_products_active = delta_table_dim_products.toDF().filter(col("ActiveInd") == "Y")

    df_join_src_trg = (
        df_src.alias("src").join(
            df_dim_products_active.alias("trg"),
            col("src.ProductID") == col("trg.ProductID"),
            "left_outer"
        )
        .withColumn(
            "ChangeTypeFlag",
            when(col("trg.ProductID").isNull(), "I")
            .when(
                    (col("src.ProductName") != col("trg.ProductName"))
                    | (col("src.Category") != col("trg.Category"))
                    | (col("src.Description") != col("trg.Description"))
                    | (col("src.Price") != col("trg.Price"))
                    | (col("src.StockQuantity") != col("trg.StockQuantity"))
                    | (col("src.Supplier") != col("trg.Supplier"))
                    | (col("src.DateAdded") != col("trg.DateAdded"))
                    | (col("src.Status") != col("trg.Status"))
                , "U"
            )
            .otherwise("X")
        )
        .select(
            "src.ProductID",
            "src.ProductName",
            "src.Category",
            "src.Description",
            "src.Price",
            "src.StockQuantity",
            "src.Supplier",
            "src.DateAdded",
            "src.Status",
            "ChangeTypeFlag"
        )
    )

    df_dim_products_insert = (
        df_join_src_trg.filter(col("ChangeTypeFlag").isin("I", "U"))
        .withColumn("ValidFromDate", lit(params["valid_from_dttm"]))
        .withColumn("ValidToDate", lit(params["max_valid_to_dttm"]))
        .withColumn("ActiveInd", lit("Y"))
        .withColumn("ValidInd", lit("Y"))
        .withColumn("InsertRunID", lit(params["run_id"]))
        .withColumn("InsertDttm", lit(params["process_start_time"]))
        .drop("ChangeTypeFlag")
    )

    df_dim_products_update = (
        df_join_src_trg.filter(col("ChangeTypeFlag").isin("U"))
        .withColumn("ValidToDate", lit(params["valid_from_dttm"]))
        .withColumn("ActiveInd", lit("N"))
        .withColumn("UpdateRunID", lit(params["run_id"]))
        .withColumn("UpdateDttm", lit(params["process_start_time"]))
        .drop("ChangeTypeFlag")
    )

    # Update/expire records using merge
    delta_table_dim_products.alias("trg") \
        .merge(
            df_dim_products_update.alias("upd"),
            "trg.ProductID = upd.ProductID"
        ) \
        .whenMatchedUpdate(
            set = {
                "ValidToDate": "upd.ValidToDate",
                "ActiveInd": "upd.ActiveInd",
                "UpdateRunID": "upd.UpdateRunID",
                "UpdateDttm": "upd.UpdateDttm"
            }
        ) \
        .execute()

    # Insert new records
    df_dim_products_insert.write.mode("append").saveAsTable(trg_table_name)


In [0]:
# Initialize run
init_params = init_run(spark, "nb_dim_products2")

try:

    print(init_params)

    # Read bronze table
    spark.conf.set("fs.azure.account.key.stvptest001.dfs.core.windows.net", "QBM5+zw4iTqnSybwHeAZbFW38GQZXY9ch8EzO4IoIhwnxpoGV5mKIuKpxb1OFfhyftURUuMzZKGt+ASt8tCIhQ==")

    #df_br_products = spark.read.table("bronze.products")
    df_br_products = spark.read.table("bronze.products_bad")

    row_count_read = df_br_products.count()

    # Apply transformations
    df_cleaned_products = clean_products(df_br_products)

    # Load data in Dim table using SCD type2
    scd_type2(df_cleaned_products, "silver.dim_products", init_params)

    # Finalize run
    finalize_run(spark, init_params["run_id"], "success")

except:
    
    # Finalize run with error
    finalize_run(spark, init_params["run_id"], "fail")

    raise

In [0]:
%skip
delta_table_dim_products = DeltaTable.forName(spark, "silver.dim_products")
lastOperationDF = delta_table_dim_products.history(1)

In [0]:
%skip
display(lastOperationDF)