In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col,when
from pyspark.sql.types import StringType

In [0]:
#Variable
dbutils.widgets.text("table_name","FactPnLCommercial")
table_name = dbutils.widgets.get("table_name")

In [0]:
keys_name_list = [
    "DimBrand",
    "DimClassOfTrade",
    "DimCompany",
    "DimCustomer",
    "DimCustomerGroup5",
    "DimDistributionChannel",
    "DimDivision",
    "DimMaterial",
    "DimProductHierarchy",
    "DimProfitCenterData",
]

keys_path_list = [
    i.path.replace("dbfs:", "")[:-1]
    for i in dbutils.fs.ls("/mnt/dw01silver/Dim")
    if i.name.replace("/", "") in keys_name_list
]

keys_list = [i.replace("Dim","") + "Key" for i in keys_name_list]

#Variable (Conditioned)
if table_name == 'Z1FI3MP01':
    src_path = "/mnt/dw01bronze/Fact/pre_tx_FactPnLCommercial"
    dest_path = "/mnt/dw01silver/Fact/FactProfitAndLoss"
elif table_name == 'Z1FI3MP02':
    src_path = "/mnt/dw01bronze/Fact/pre_tx_FactPnLCombine"
    dest_path = "/mnt/dw01silver/Fact/FactProfitAndLoss"
else:
    table_name = 'Error แน่มิง'
    src_path = "error"
    dest_path = "error"

In [0]:
print(f'src_path: {src_path}')
print(f'dest_path: {dest_path}')

/mnt/dw01bronze/Fact/pre_tx_FactPnLCombine
/mnt/dw01silver/Fact/FactProfitAndLoss


In [0]:
def generate_merge_script(src_path, dest_path):
    df = spark.read.format("delta").load(dest_path)

    # Identify fucking string
    string_columns = [
        field.name
        for field in df.schema.fields
        if (isinstance(field.dataType, StringType))
        & (field.name != "CreatedBy")
        & (field.name != "ModifiedBy")
        & (field.name != "Source")
    ]

    merge_condition = "\n   AND ".join(
        [f"tar.{i} = src.{i}" for i in string_columns]
    )

    # update_columns = [i for i in df.columns if (i not in string_columns) & (i != "ID") & (i != "IsActive")]
    update_columns = [
        i for i in df.columns
        if i not in string_columns
        and i not in ["ID", "IsActive", "CreatedBy", "CreatedDate", "ModifiedBy", "ModifiedDate"]
    ]
    update_set = ",\n       ".join([f"tar.{i} = src.{i}" for i in update_columns])

    insert_columns = ", ".join([
        i for i in df.columns
        if i not in ["ID", "IsActive", "CreatedBy", "CreatedDate", "ModifiedBy", "ModifiedDate"]
    ])
    insert_values = ", ".join([f"src.{i}" for i in df.columns if i not in ["ID", "IsActive", "CreatedBy", "CreatedDate", "ModifiedBy", "ModifiedDate"]])

    # insert_values = ", ".join([f"src.{i}" for i in df.columns if (i != "ID")])

    df_temp = spark.read.format('parquet').load(src_path)
    df_temp.createOrReplaceTempView("src")
    # Build the final MERGE SQL query
    merge_sql = f"""
    MERGE INTO delta.`{dest_path}` AS tar
    USING src AS src
    ON {merge_condition}
    WHEN MATCHED THEN
      UPDATE SET
        {update_set},
        tar.IsActive = 1,
        tar.ModifiedBy = 'spark',
        tar.ModifiedDate = CURRENT_TIMESTAMP
    WHEN NOT MATCHED THEN
      INSERT ({insert_columns}
        ,IsActive
        ,CreatedBy
        ,CreatedDate
        ,ModifiedBy
        ,ModifiedDate
        )
      VALUES ({insert_values}
        ,1
        ,'spark'
        ,CURRENT_TIMESTAMP
        ,'spark'
        ,CURRENT_TIMESTAMP
        );
    """

    return merge_sql


#Create Surrogate Key

In [0]:
df_fact = spark.read.format("parquet").load(src_path)
for i in keys_path_list:
    if i.split("/")[-1].replace("Dim", "") == "Material":
        name_code = "MaterialNumber"
        name_key = "MaterialKey"
    else:
        name_code = i.split("/")[-1].replace("Dim", "") + "Code"
        name_key = name_code.replace("Code", "Key")

    print(f"Adding: {name_key}")
    df_dim = spark.read.format("delta").load(i)
    df_fact = df_fact.join(
        df_dim, df_fact[f"{name_code}"] == df_dim[f"{name_code}"], "left"
    ).select(df_fact[f"*"], when(df_dim[f"{name_key}"].isNull(), -1).otherwise(df_dim[f"{name_key}"]).alias(f"{name_key}"))

Adding: BrandKey
Adding: ClassOfTradeKey
Adding: CompanyKey
Adding: CustomerKey
Adding: CustomerGroup5Key
Adding: DistributionChannelKey
Adding: DivisionKey
Adding: MaterialKey
Adding: ProductHierarchyKey
Adding: ProfitCenterDataKey



#Run Create Surrogate Key

In [0]:
merge_script = generate_merge_script(src_path, dest_path)
df_fact.createOrReplaceTempView('src')
result = spark.sql(merge_script)
print(result.select("num_affected_rows").collect()[0])
print(result.select("num_updated_rows").collect()[0])
print(result.select("num_deleted_rows").collect()[0])
print(result.select("num_inserted_rows").collect()[0])


Row(num_affected_rows=5270)
Row(num_updated_rows=5270)
Row(num_deleted_rows=0)
Row(num_inserted_rows=0)


In [0]:
print(merge_script)


    MERGE INTO delta.`/mnt/dw01silver/Fact/FactProfitAndLoss` AS tar
    USING src AS src
    ON tar.ScenarioCode = src.ScenarioCode
   AND tar.Month = src.Month
   AND tar.Year = src.Year
   AND tar.PnLTypeCode = src.PnLTypeCode
   AND tar.BrandCode = src.BrandCode
   AND tar.ClassOfTradeCode = src.ClassOfTradeCode
   AND tar.CompanyCode = src.CompanyCode
   AND tar.CustomerCode = src.CustomerCode
   AND tar.CustomerGroup5Code = src.CustomerGroup5Code
   AND tar.DistributionChannelCode = src.DistributionChannelCode
   AND tar.DivisionCode = src.DivisionCode
   AND tar.MaterialNumber = src.MaterialNumber
   AND tar.ProductHierarchyCode = src.ProductHierarchyCode
   AND tar.ProfitCenterDataCode = src.ProfitCenterDataCode
   AND tar.SalesOrgCode = src.SalesOrgCode
   AND tar.ProductHierarchyCodeLevel2 = src.ProductHierarchyCodeLevel2
   AND tar.ProductHierarchyCodeLevel1 = src.ProductHierarchyCodeLevel1
   AND tar.ICPCode = src.ICPCode
   AND tar.CoProfitCenterCode = src.CoProfitCenterC

In [0]:
# %sql
# SELECT DISTINCT * FROM delta.`/mnt/dw01silver/Fact/FactProfitAndLoss`
# WHERE PnLTypeCode = 'Combine'

In [0]:
# spark.sql(f"DESCRIBE HISTORY delta.`{dest_path}`").display()

In [0]:
# src_path = "/mnt/dw01bronze/Fact/pre_tx_FactPnLCombine"
# df = spark.read.format('parquet').load(src_path)
# df.createOrReplaceTempView('your_table')

In [0]:
# %sql
# SELECT DISTINCT 
#     ScenarioCode,
#     Month,
#     Year,
#     PnLTypeCode,
#     BrandCode,
#     ClassOfTradeCode,
#     CompanyCode,
#     CustomerCode,
#     CustomerGroup5Code,
#     DistributionChannelCode,
#     DivisionCode,
#     MaterialNumber,
#     ProductHierarchyCode,
#     ProfitCenterDataCode,
#     SalesOrgCode,
#     ProductHierarchyCodeLevel2,
#     ProductHierarchyCodeLevel1,
#     ICPCode,
#     CoProfitCenterCode,
#     CurrencyKey,
#     ControllingAreaCode,
#     ProfitCenterCode,
#     ShipToCustomerCode,
#     ShipToParentCustomerCode,
#     TradingPartnerCode,
#     ProfitSupplyChainCode,
#     ProfitCategoryCode
# FROM delta.`/mnt/dw01silver/Fact/FactProfitAndLoss`
# WHERE PnLTypeCode = 'Combine'
