In [0]:
%run "/Workspace/Users/malhotrasushant32@gmail.com/Basic Checks"

In [0]:
df_list=[]
mount_path = "/mnt/gsynergyproject/rawdata"
file_list = ["fact.averagecosts.dlm",
"fact.transactions.dlm",
"hier.clnd.dlm",
"hier.hldy.dlm",
"hier.invloc.dlm",
"hier.invstatus.dlm",
"hier.possite.dlm",
"hier.pricestate.dlm",
"hier.prod.dlm",
"hier.rtlloc.dlm"]
read_files(mount_path, file_list)

In [0]:
def read_files(mount_path, file_list):
    for file_name in file_list:
        new_name = file_name.replace(".", "_").replace("dlm", "df") 
        file_path = f"{mount_path}/{file_name}" 
        
        
        df = spark.read.option("header", "true") \
                       .option("delimiter", "|") \
                       .option("inferSchema", "true") \
                       .csv(file_path)
        
        
        globals()[new_name] = df  
        
        df_list.append(new_name)

In [0]:
print(df_list)

['fact_averagecosts_df', 'fact_transactions_df', 'hier_clnd_df', 'hier_hldy_df', 'hier_invloc_df', 'hier_invstatus_df', 'hier_possite_df', 'hier_pricestate_df', 'hier_prod_df', 'hier_rtlloc_df']


In [0]:
expected_schemas =  {
    "fact_averagecosts_df": {
        "fscldt_id": "int",   #primary key
        "sku_id": "string",
        "average_unit_standardcost": "double",
        "average_unit_landedcost": "double",
    },
    "fact_transactions_df": {
        "order_id": "bigint",  #primary key
        "line_id": "int",
        "type": "string",
        "dt": "timestamp",
        "pos_site_id": "string",
        "sku_id": "string",
        "fscldt_id": "int",
        "price_substate_id": "string",
        "sales_units": "int",
        "sales_dollars": "double",
        "discount_dollars": "double",
        "original_order_id": "bigint",
        "original_line_id": "int",
    },
    "hier_clnd_df": {
        "fscldt_id": "int",
        "fscldt_label": "string",
        "fsclwk_id": "int",
        "fsclwk_label": "string",
        "fsclmth_id": "int",
        "fsclmth_label": "string",
        "fsclqrtr_id": "int",
        "fsclqrtr_label": "string",
        "fsclyr_id": "int",
        "fsclyr_label": "int",
        "ssn_id": "string",
        "ssn_label": "string",
        "ly_fscldt_id": "int",
        "lly_fscldt_id": "int",
        "fscldow": "int",
        "fscldom": "int",
        "fscldoq": "int",
        "fscldoy": "int",
        "fsclwoy": "int",
        "fsclmoy": "int",
        "fsclqoy": "int",
        "date": "date",
    },
    "hier_hldy_df": {"hldy_id": "string", "hldy_label": "string"},
    "hier_invloc_df": {
        "loc": "int",
        "loc_label": "string",
        "loctype": "string",
        "loctype_label": "string",
    },
    "hier_invstatus_df": {
        "code_id": "string",
        "code_label": "string",
        "bckt_id": "string",
        "bckt_label": "string",
        "ownrshp_id": "string",
        "ownrshp_label": "string",
    },
    "hier_possite_df": {
        "site_id": "string",
        "site_label": "string",
        "subchnl_id": "string",
        "subchnl_label": "string",
        "chnl_id": "string",
        "chnl_label": "string",
    },
    "hier_pricestate_df": {
        "substate_id": "string",
        "substate_label": "string",
        "state_id": "string",
        "state_label": "string",
    },
    "hier_prod_df": {
        "sku_id": "string",
        "sku_label": "string",
        "stylclr_id": "string",
        "stylclr_label": "string",
        "styl_id": "string",
        "styl_label": "string",
        "subcat_id": "int",
        "subcat_label": "string",
        "cat_id": "int",
        "cat_label": "string",
        "dept_id": "int",
        "dept_label": "string",
        "issvc": "int",
        "isasmbly": "int",
        "isnfs": "int",
    },
    "hier_rtlloc_df": {
        "str": "int",
        "str_label": "string",
        "dstr": "int",
        "dstr_label": "string",
        "rgn": "int",
        "rgn_label": "string",
    },
}

primary_keys = {'fact_averagecosts_df': 'fscldt_id', 'fact_transactions_df': 'order_id', 'hier_clnd_df': 'fscldt_id', 'hier_hldy_df': 'hldy_id', 'hier_invloc_df': 'loc', 'hier_invstatus_df': 'code_id', 'hier_possite_df': 'site_id', 'hier_pricestate_df': 'substate_id', 'hier_prod_df': 'sku_id', 'hier_rtlloc_df': 'str'}

foreign_keys = [
    ("fact_transactions_df", "hier_prod_df", "sku_id", "sku_id")  
]

In [0]:
for df_name in df_list:  
    df = globals().get(df_name)  
    
    if df is not None: 
        cols = df.columns
        check_null_values(df, df_name)
        check_primary_key_uniqueness(df, df_name, primary_keys[df_name])
        check_data_types(df, df_name, expected_schemas[df_name])  
    else:
        print(f"Warning: DataFrame '{df_name}' not found.")


Checking null values for DataFrame: fact_averagecosts_df
Null Value check for DataFrame 'fact_averagecosts_df' is completed!

Checking uniqueness for primary key 'fscldt_id' in DataFrame: fact_averagecosts_df
Primary key 'fscldt_id' has duplicate values in 'fact_averagecosts_df'.
Primary Key Uniqueness check for 'fact_averagecosts_df' is completed!

Checking data types for DataFrame: fact_averagecosts_df
Column 'fscldt_id' in 'fact_averagecosts_df' has the correct data type: int.
Column 'sku_id' in 'fact_averagecosts_df' has the correct data type: string.
Column 'average_unit_standardcost' in 'fact_averagecosts_df' has the correct data type: double.
Column 'average_unit_landedcost' in 'fact_averagecosts_df' has the correct data type: double.
Data Type check for 'fact_averagecosts_df' is completed!

Checking null values for DataFrame: fact_transactions_df
 DataFrame 'fact_transactions_df': Column 'original_order_id' has 3993143 null values
 DataFrame 'fact_transactions_df': Column 'orig

In [0]:
for fact_df_name, dim_df_name, fact_fk, dim_pk in foreign_keys:
    fact_df = globals().get(fact_df_name)
    dim_df = globals().get(dim_df_name)
    
    if fact_df is not None and dim_df is not None:
        check_foreign_key_constraint(fact_df, fact_df_name, dim_df, dim_df_name, fact_fk, dim_pk)
    else:
        print(f" Warning: One of the DataFrames '{fact_df_name}' or '{dim_df_name}' is missing.")


🔍 Checking Foreign Key Constraint between 'fact_transactions_df' (fact) and 'hier_prod_df' (dimension) on 'sku_id' → 'sku_id'
Foreign Key Constraint 'sku_id → sku_id' is satisfied.
 Foreign Key Constraint check for 'fact_transactions_df' → 'hier_prod_df' is completed!



TASK 2


In [0]:

stg_fiscal_date_df = hier_clnd_df.select("fscldt_id", "fscldt_label", "date")
stg_holiday_df = hier_hldy_df.select("hldy_id", "hldy_label")
stg_inventory_location_df = hier_invloc_df.select("loc", "loc_label", "loctype", "loctype_label")
stg_inventory_status_df = hier_invstatus_df.select("code_id", "code_label", "bckt_id", "bckt_label", "ownrshp_id", "ownrshp_label")
stg_pos_site_df = hier_possite_df.select("site_id", "site_label", "subchnl_id", "subchnl_label", "chnl_id", "chnl_label")
stg_price_state_df = hier_pricestate_df.select("substate_id", "substate_label", "state_id", "state_label")
stg_product_df = hier_prod_df.select("sku_id", "sku_label", "stylclr_id", "stylclr_label", "styl_id", "styl_label", 
                                     "subcat_id", "subcat_label", "cat_id", "cat_label", "dept_id", "dept_label", 
                                     "issvc", "isasmbly", "isnfs")
stg_retail_location_df = hier_rtlloc_df.select("str", "str_label", "dstr", "dstr_label", "rgn", "rgn_label")



In [0]:
foreign_keys_staging = [
    ("fact_averagecosts_df", "stg_fiscal_date_df", "fscldt_id", "fscldt_id"),
    ("fact_averagecosts_df", "stg_product_df", "sku_id", "sku_id"),
    ("fact_transactions_df", "stg_fiscal_date_df", "fscldt_id", "fscldt_id"),
    ("fact_transactions_df", "stg_product_df", "sku_id", "sku_id"),
    ("fact_transactions_df", "stg_price_state_df", "price_substate_id", "substate_id"),
    ("fact_transactions_df", "stg_pos_site_df", "pos_site_id", "site_id"),
    
    ("fact_transactions_df", "stg_retail_location_df", "pos_site_id", "str"), 
    ("fact_transactions_df", "stg_fiscal_date_df", "fscldt_id", "fscldt_id"),
    ("fact_transactions_df", "stg_inventory_status_df", "price_substate_id", "code_id"),
    ("fact_transactions_df", "stg_inventory_location_df", "pos_site_id", "loc"),
]


# Run Foreign Key Check
for fact_df_name, dim_df_name, fact_fk, dim_pk in foreign_keys_staging:
    fact_df = globals().get(fact_df_name)
    dim_df = globals().get(dim_df_name)
    
    if fact_df is not None and dim_df is not None:
        check_foreign_key_constraint(fact_df, fact_df_name, dim_df, dim_df_name, fact_fk, dim_pk)
    else:
        print(f"Warning: One of the DataFrames '{fact_df_name}' or '{dim_df_name}' is missing.")


🔍 Checking Foreign Key Constraint between 'fact_averagecosts_df' (fact) and 'stg_fiscal_date_df' (dimension) on 'fscldt_id' → 'fscldt_id'
323127 records in 'fact_averagecosts_df' have missing foreign key values that do not exist in 'stg_fiscal_date_df'.
 Foreign Key Constraint check for 'fact_averagecosts_df' → 'stg_fiscal_date_df' is completed!

🔍 Checking Foreign Key Constraint between 'fact_averagecosts_df' (fact) and 'stg_product_df' (dimension) on 'sku_id' → 'sku_id'
Foreign Key Constraint 'sku_id → sku_id' is satisfied.
 Foreign Key Constraint check for 'fact_averagecosts_df' → 'stg_product_df' is completed!

🔍 Checking Foreign Key Constraint between 'fact_transactions_df' (fact) and 'stg_fiscal_date_df' (dimension) on 'fscldt_id' → 'fscldt_id'
2206380 records in 'fact_transactions_df' have missing foreign key values that do not exist in 'stg_fiscal_date_df'.
 Foreign Key Constraint check for 'fact_transactions_df' → 'stg_fiscal_date_df' is completed!

🔍 Checking Foreign Key Cons

In [0]:
display(fact_transactions_df)

order_id,line_id,type,dt,pos_site_id,sku_id,fscldt_id,price_substate_id,sales_units,sales_dollars,discount_dollars,original_order_id,original_line_id
164087401,2,Sale,2016-01-31T06:17:01Z,CATMAIN,2668940801,20160131,FP,1,58.95,0.0,,
164087409,4,Sale,2016-01-31T06:17:25Z,CATMAIN,2920920601,20160131,FP,1,49.95,0.0,,
164087440,2,Sale,2016-01-31T06:19:28Z,INETMAIN,0695690000,20160131,FP,2,37.9,0.0,,
164087469,2,Sale,2016-01-31T07:03:00Z,CATMAIN,0505490000,20160131,FP,1,45.0,0.0,,
164087472,2,Sale,2016-01-31T07:07:36Z,CATMAIN,2872180801,20160131,FP,1,98.95,0.0,,
164087476,1,Sale,2016-01-31T07:16:49Z,CATMAIN,4840520704,20160131,FP,2,139.9,0.0,,
164087482,1,Sale,2016-01-31T07:28:10Z,CATMAIN,0403081000,20160131,FP,1,22.95,0.0,,
164087501,2,Sale,2016-01-31T08:26:32Z,CATMAIN,0700002000,20160131,FP,1,24.95,0.0,,
164087518,1,Sale,2016-01-31T11:10:29Z,CATMAIN,2598481801,20160131,FP,1,59.95,0.0,,
164087571,1,Sale,2016-01-31T13:38:18Z,CATMAIN,2815121701,20160131,FP,1,118.95,0.0,,


TASK 3

In [0]:
from pyspark.sql.functions import sum

mview_weekly_sales_df = (
    fact_transactions_df.join(hier_clnd_df.select("fscldt_id", "fsclwk_id"), on="fscldt_id", how="left")
    .groupBy("pos_site_id", "sku_id", "fsclwk_id", "price_substate_id", "type")
    .agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars"),
    )
)



In [0]:
dbutils.fs.rm("/mnt/gsynergyproject/processeddata/", True)
mview_weekly_sales_df.write.format("delta").mode("overwrite").save("/mnt/gsynergyproject/processeddata/mview_weekly_sales_table")

In [0]:
%sql
SELECT * FROM delta.`/mnt/gsynergyproject/processeddata/mview_weekly_sales_table`


pos_site_id,sku_id,fsclwk_id,price_substate_id,type,total_sales_units,total_sales_dollars,total_discount_dollars
CATMAIN,2785140701,201801.0,FP,Sale,16,1108.71,10.49
177,1AV5420000,201801.0,MD2,Sale,9,49.89,39.93000000000001
155,2598420801,201801.0,FP,Sale,1,59.95,0.0
INETMAIN,0310920000,201801.0,FP,Return,3,126.0,0.0
CATMAIN,6831981800,201801.0,FP,Sale,3,97.86,6.99
INETMAIN,2AL8220601,201801.0,FP,Sale,8,639.6,0.0
CATSALE,2666820701,201801.0,FP,Return,5,197.75,2.0
CATSALE,2AB1530801,201801.0,MD1,Return,3,149.97,0.0
INETMAIN,0787630000,201801.0,FP,Sale,1,12.0,0.0
171,0174410000,201801.0,FP,Sale,3,86.25,0.0


In [0]:
from pyspark.sql.functions import sum
from delta.tables import DeltaTable


latest_fact_df = fact_transactions_df.filter("dt >= date_sub(current_date(), 7)")  # Example: last 7 days

latest_fact_with_week_df = latest_fact_df.join(
    hier_clnd_df.select("fscldt_id", "fsclwk_id"),
    on="fscldt_id",
    how="left"
)

incremental_aggregates_df = (
    latest_fact_with_week_df
    .groupBy("pos_site_id", "sku_id", "fsclwk_id", "price_substate_id", "type")
    .agg(
        sum("sales_units").alias("total_sales_units"),
        sum("sales_dollars").alias("total_sales_dollars"),
        sum("discount_dollars").alias("total_discount_dollars")
    )
)

# Step 4: Merge into existing mview_weekly_sales Delta Table
mview_table_path = "/mnt/gsynergyproject/processeddata/mview_weekly_sales_table"

mview_table = DeltaTable.forPath(spark, mview_table_path)

mview_table.alias("existing").merge(
    incremental_aggregates_df.alias("updates"),
    """
    existing.pos_site_id = updates.pos_site_id
    AND existing.sku_id = updates.sku_id
    AND existing.fsclwk_id = updates.fsclwk_id
    AND existing.price_substate_id = updates.price_substate_id
    AND existing.type = updates.type
    """
).whenMatchedUpdate(set={
    "total_sales_units": "existing.total_sales_units + updates.total_sales_units",
    "total_sales_dollars": "existing.total_sales_dollars + updates.total_sales_dollars",
    "total_discount_dollars": "existing.total_discount_dollars + updates.total_discount_dollars"
}).whenNotMatchedInsert(values={
    "pos_site_id": "updates.pos_site_id",
    "sku_id": "updates.sku_id",
    "fsclwk_id": "updates.fsclwk_id",
    "price_substate_id": "updates.price_substate_id",
    "type": "updates.type",
    "total_sales_units": "updates.total_sales_units",
    "total_sales_dollars": "updates.total_sales_dollars",
    "total_discount_dollars": "updates.total_discount_dollars"
}).execute()
