In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, lit, coalesce
from pyspark.sql.types import IntegerType, StringType

spark = SparkSession.builder.appName("SCD_DIM_TBL").getOrCreate()

In [26]:
silver_dataset_folder_path = "abfss://silver@adlssalesproject2448pp2.dfs.core.windows.net"
gold_dataset_folder_path = "abfss://gold@adlssalesproject2448pp2.dfs.core.windows.net"

In [27]:
pk_dict = {
    'PRODUCT': 'NDC_CD',
    'PROVIDER': 'PROVIDER_ID',
    'DIAGNOSIS': 'DIAGNOSIS_CODE,ICD_VERSION_TYPE',
    'PROCEDURE': 'PROCEDURE_CODE,PRC_VERS_TYP_ID',
    'PLAN': 'PAYER_PLAN_ID',
    'RX PATIENT ACTIVITY': 'PATIENT_ID',
    'PATIENT MPD': 'PATIENT_ID,MPD_YEAR',
    'PATIENT COMMERCIAL': 'PATIENT_ID,ACTIVITY_YEAR',
    'PATIENT DEMOGRAPHICS': 'PATIENT_ID'
}
scd_dict = {
    'PRODUCT': 'SCD2',
    'PROVIDER': 'SCD2',
    'DIAGNOSIS': 'SCD1',
    'PROCEDURE': 'SCD1',
    'PLAN': 'SCD1',
    'RX PATIENT ACTIVITY': 'SCD1',
    'PATIENT MPD': 'SCD1',
    'PATIENT COMMERCIAL': 'SCD1',
    'PATIENT DEMOGRAPHICS': 'SCD1'
}

In [28]:
# Get subfolders in the silver dataset
silver_subfolder_list = mssparkutils.fs.ls(silver_dataset_folder_path)

for subfolder in silver_subfolder_list:
    subfolder_path = subfolder.path
    subfolder_name = subfolder.name
    
    # Extract the prefix to determine table name
    f_name = subfolder_name.split('_')[0]  # Corrected this line to get the table name
    pk_cols = pk_dict[f_name].split(',')   # Split primary key columns in case there are multiple keys
    
    # Get the list of files in the subfolder
    file_list = mssparkutils.fs.ls(subfolder_path)
    
    for file in file_list:
        if file.size != 0:
            file_path = file.path
            silver_df = spark.read.option("header", True).parquet(file_path)
            
            try:
                # Load existing gold table
                gold_df = spark.read.format("delta").load(f"{gold_dataset_folder_path}/{f_name}")
            except:
                # Initialize an empty gold dataframe with required schema if the gold table doesn't exist
                gold_df = spark.createDataFrame([], silver_df.schema.add("Effective_date", StringType())
                                                         .add("ValidTo", StringType())
                                                         .add("ActiveFlag", StringType())
                                                         .add("Version", IntegerType()))

            if scd_dict[f_name].lower() == 'scd1':
                # Perform SCD Type 1 logic - overwrite records
                joined_df = silver_df.alias("src").join(gold_df.alias("dest"), pk_cols, "left")
                
                # If record doesn't exist in the target, insert new data, otherwise update with latest values
                final_df = joined_df.selectExpr("src.*", 
                                                "coalesce(dest.Effective_date, cast(current_date() as string)) as Effective_date")
                
                safe_f_name = f_name.replace(" ","_")

                # Save in Delta format (overwrite mode for SCD1)
                final_df.coalesce(1).write.format("delta").mode("overwrite").save(f"{gold_dataset_folder_path}/{safe_f_name}_scd1")
                
                # Create Delta table in the database (corrected SQL query)
                
                spark.sql(f"CREATE DATABASE IF NOT EXISTS scdprojectdb;")

                spark.sql(f"""
                    CREATE TABLE IF NOT EXISTS scdprojectdb.`TBL_DIM_{safe_f_name}_scd1`
                    USING DELTA
                    LOCATION '{gold_dataset_folder_path}/{safe_f_name}_scd1';
                """)
                spark.sql(f"""
                select * from scdprojectdb.`TBL_DIM_{safe_f_name}_scd1` LIMIT 1;
                """)

            elif scd_dict[f_name].lower() == 'scd2':
                # Perform SCD Type 2 logic - maintain history
                join_condition = ' AND '.join([f"src.{col} = dest.{col}" for col in pk_cols])

                # Identify new records
                new_records_df = silver_df.alias("src") \
                    .join(gold_df.alias("dest"), pk_cols, "left_anti") \
                    .selectExpr("src.*", 
                                "current_date() as Effective_date", 
                                "NULL as ValidTo", 
                                "'Y' as ActiveFlag", 
                                "1 as Version")
                

                # Identify updated records
                updated_records_df = silver_df.alias("src") \
                    .join(gold_df.alias("dest"), pk_cols, "inner") \
                    .filter("dest.ActiveFlag = 'Y' AND " + " OR ".join([f"src.{col} != dest.{col}" for col in silver_df.columns if col not in["Effective_date","ValidTo","ActiveFlag","Version"]])) \
                    .selectExpr("src.*", 
                                "current_date() as Effective_date", 
                                "NULL as ValidTo", 
                                "'Y' as ActiveFlag", 
                                "dest.Version + 1 as Version")
                

                # Inactivate old records in the gold_df
                gold_inactive_df = silver_df.alias("src") \
                    .join(gold_df.alias("dest"), pk_cols, "inner") \
                    .filter("dest.ActiveFlag = 'Y' AND " + " OR ".join([f"src.{col} != dest.{col}" for col in silver_df.columns if col not in["Effective_date","ValidTo","ActiveFlag","Version"]])) \
                    .selectExpr("dest.*") \
                    .withColumn("ValidTo", current_date()) \
                    .withColumn("ActiveFlag", lit("N"))
                
                # Combine all records (new, updated, and inactivated)
                final_df = new_records_df.union(updated_records_df).union(gold_inactive_df)

                safe_f_name = f_name.replace(" ","_")

                # Save in Delta format (append mode for SCD2)
                final_df.coalesce(1).write.format("delta").mode("overwrite").save(f"{gold_dataset_folder_path}/{safe_f_name}_scd2")
                
                # Create Delta table in the database (corrected SQL query)
                
                spark.sql(f"CREATE DATABASE IF NOT EXISTS scdprojectdb;")

                spark.sql(f"""
                    CREATE TABLE IF NOT EXISTS scdprojectdb.`TBL_DIM_{safe_f_name}_scd2`
                    USING DELTA
                    LOCATION '{gold_dataset_folder_path}/{safe_f_name}_scd2';
                """)
