In [0]:
# %fs ls /mnt/dev/Energy_DE

In [0]:
from pyspark.sql.functions import col, countDistinct,regexp_extract

In [0]:
# Define the file paths
file_paths = [
    "dbfs:/mnt/dev/Energy_DE/fact_averagecosts_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/fact_transactions_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_clnd_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_hldy_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_invloc_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_invstatus_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_possite_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_pricestate_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_prod_dlm.gz",
    "dbfs:/mnt/dev/Energy_DE/hier_rtlloc_dlm.gz"
]

# Function to load a gzipped file
def load_gzipped_file(path):
    return spark.read.option("header", "true").option("delimiter", "|").csv(path)

# Load each file into a DataFrame
fact_averagecosts_df = load_gzipped_file(file_paths[0])
fact_transactions_df = load_gzipped_file(file_paths[1])
hier_clnd_df = load_gzipped_file(file_paths[2])
hier_hldy_df = load_gzipped_file(file_paths[3])
hier_invloc_df = load_gzipped_file(file_paths[4])
hier_invstatus_df = load_gzipped_file(file_paths[5])
hier_possite_df = load_gzipped_file(file_paths[6])
hier_pricestate_df = load_gzipped_file(file_paths[7])
hier_prod_df = load_gzipped_file(file_paths[8])
hier_rtlloc_df = load_gzipped_file(file_paths[9])


In [0]:
fact_averagecosts_df.printSchema()
fact_transactions_df.printSchema()
hier_clnd_df.printSchema()
hier_hldy_df.printSchema()
hier_invloc_df.printSchema()
hier_invstatus_df.printSchema()
hier_possite_df.printSchema()
hier_pricestate_df.printSchema()
hier_prod_df.printSchema()
hier_rtlloc_df.printSchema()

root
 |-- fscldt_id: string (nullable = true)
 |-- sku_id: string (nullable = true)
 |-- average_unit_standardcost: string (nullable = true)
 |-- average_unit_landedcost: string (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- line_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- pos_site_id: string (nullable = true)
 |-- sku_id: string (nullable = true)
 |-- fscldt_id: string (nullable = true)
 |-- price_substate_id: string (nullable = true)
 |-- sales_units: string (nullable = true)
 |-- sales_dollars: string (nullable = true)
 |-- discount_dollars: string (nullable = true)
 |-- original_order_id: string (nullable = true)
 |-- original_line_id: string (nullable = true)

root
 |-- fscldt_id: string (nullable = true)
 |-- fscldt_label: string (nullable = true)
 |-- fsclwk_id: string (nullable = true)
 |-- fsclwk_label: string (nullable = true)
 |-- fsclmth_id: string (nullable = true)
 |-- fsclmth_label: string 

In [0]:

dataframes = {
    "fact_averagecosts_df": "sku_id",
    "fact_transactions_df": "order_id",
    "hier_clnd_df": "fscldt_id",
    "hier_hldy_df": "hldy_id",
    "hier_invloc_df": "loc",
    "hier_invstatus_df": "code_id",
    "hier_possite_df": "site_id",
    "hier_pricestate_df": "substate_id",
    "hier_prod_df": "sku_id",
    "hier_rtlloc_df": "str"
}

for df_name, primary_key in dataframes.items():
    df = globals()[df_name]
    
    # Non-null check for primary key
    null_count = df.filter(col(primary_key).isNull()).count()
    print(f"Null values in '{primary_key}' column of {df_name}: {null_count}")  # Should be 0

    # Uniqueness check for primary key
    distinct_count = df.select(countDistinct(primary_key)).collect()[0][0]
    total_rows = df.count()
    print(f"Is '{primary_key}' unique in {df_name}? {distinct_count == total_rows}")  # Should be True


Null values in 'sku_id' column of fact_averagecosts_df: 0
Is 'sku_id' unique in fact_averagecosts_df? False
Null values in 'order_id' column of fact_transactions_df: 0
Is 'order_id' unique in fact_transactions_df? False
Null values in 'fscldt_id' column of hier_clnd_df: 0
Is 'fscldt_id' unique in hier_clnd_df? True
Null values in 'hldy_id' column of hier_hldy_df: 0
Is 'hldy_id' unique in hier_hldy_df? True
Null values in 'loc' column of hier_invloc_df: 0
Is 'loc' unique in hier_invloc_df? True
Null values in 'code_id' column of hier_invstatus_df: 0
Is 'code_id' unique in hier_invstatus_df? True
Null values in 'site_id' column of hier_possite_df: 0
Is 'site_id' unique in hier_possite_df? True
Null values in 'substate_id' column of hier_pricestate_df: 0
Is 'substate_id' unique in hier_pricestate_df? True
Null values in 'sku_id' column of hier_prod_df: 0
Is 'sku_id' unique in hier_prod_df? True
Null values in 'str' column of hier_rtlloc_df: 0
Is 'str' unique in hier_rtlloc_df? True


In [0]:
# Removing the string part from the column code_id
hier_invstatus_df = hier_invstatus_df.withColumn("code_id", regexp_extract("code_id", r'\d+', 0).cast("int"))

In [0]:
invalid_fk_count1 = fact_transactions_df.join(hier_prod_df, fact_transactions_df["sku_id"] == hier_prod_df["sku_id"], "left_anti").count()
print(f"Invalid foreign keys in 'sku_id': {invalid_fk_count1}")  # Should be 0

invalid_fk_count2 = fact_transactions_df.join(hier_clnd_df, fact_transactions_df["fscldt_id"] == hier_clnd_df["fscldt_id"], "left_anti").count()
print(f"Invalid foreign keys in 'fscldt_id': {invalid_fk_count2}") 

invalid_fk_count3 = fact_transactions_df.join(hier_pricestate_df, fact_transactions_df["price_substate_id"] == hier_pricestate_df["substate_id"], "left_anti").count()
print(f"Invalid foreign keys in 'substate_id': {invalid_fk_count3}") 

invalid_fk_count4 = fact_transactions_df.join(hier_invstatus_df, fact_transactions_df["line_id"] == hier_invstatus_df["code_id"], "left_anti").count()
print(f"Invalid foreign keys in 'code_id': {invalid_fk_count4}") 

invalid_fk_count5 = fact_transactions_df.join(fact_averagecosts_df, fact_transactions_df["sku_id"] == fact_averagecosts_df["sku_id"], "left_anti").count()
print(f"Invalid foreign keys in 'sku_id': {invalid_fk_count5}") 

Invalid foreign keys in 'sku_id': 0
Invalid foreign keys in 'fscldt_id': 2206380
Invalid foreign keys in 'substate_id': 0
Invalid foreign keys in 'code_id': 13886
Invalid foreign keys in 'sku_id': 0


In [0]:
# hier_prod_df
# Create separate tables for each level
hier_prod_sku_id_df = hier_prod_df.select("sku_id", "sku_label").distinct()
hier_clnd_df = hier_clnd_df.select("fscldt_id", "fscldt_label").distinct()

hier_pricestate = hier_pricestate_df.select("substate_id", "substate_label").distinct()
hier_invstatus = hier_invstatus_df.select("code_id", "code_label").distinct()

# Save to staging schema
hier_prod_sku_id_df.write.mode("overwrite").saveAsTable("Energy.hier_prod_sku_id_df")
hier_clnd_df.write.mode("overwrite").saveAsTable("Energy.hier_clnd_df")
hier_pricestate.write.mode("overwrite").saveAsTable("Energy.hier_pricestate_df")
hier_invstatus.write.mode("overwrite").saveAsTable("Energy.hier_invstatus")

In [0]:
from pyspark.sql.functions import sum
# Create a refined table called mview_weekly_sales which totals sales_units,
# sales_dollars, and discount_dollars by pos_site_id, sku_id, fsclwk_id,
# price_substate_id and type.
# Aggregate sales data
mview_weekly_sales_df = fact_transactions_df.groupBy(
    "pos_site_id",
    "sku_id",
    "fscldt_id",
    "price_substate_id",
    "type"
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# Save to refined schema
mview_weekly_sales_df.write.mode("overwrite").saveAsTable("Energy.mview_weekly_sales")

write transformation logic that will incrementally calculate all the
totals in the above table for partially loaded data.

In [0]:
%python
# Assume 'load_timestamp' is a column in the fact table
new_data_df = fact_transactions_df.filter(col("dt") > "2023-10-01")

# Aggregate new data
new_aggregates_df = new_data_df.groupBy(
    "pos_site_id",
    "sku_id",
    "fscldt_id",
    "price_substate_id",
    "type"
).agg(
    sum("sales_units").alias("total_sales_units"),
    sum("sales_dollars").alias("total_sales_dollars"),
    sum("discount_dollars").alias("total_discount_dollars")
)

# Save the existing table as a Delta table if not already done
mview_weekly_sales_df.write.format("delta").mode("overwrite").save("/mnt/dev/Energy_DE/mview_weekly_sales")

# Load the Delta table
# mview_weekly_sales_delta = DeltaTable.forPath(spark, "/path/to/mview_weekly_sales")

# Merge new aggregates into existing Delta table
new_aggregates_df.createOrReplaceTempView("new_aggregates")

spark.sql("""
MERGE INTO delta.`/mnt/dev/Energy_DE/mview_weekly_sales` AS target
USING new_aggregates AS source
ON target.pos_site_id = source.pos_site_id
   AND target.sku_id = source.sku_id
   AND target.fscldt_id = source.fscldt_id
   AND target.price_substate_id = source.price_substate_id
   AND target.type = source.type
WHEN MATCHED THEN
    UPDATE SET
        total_sales_units = target.total_sales_units + source.total_sales_units,
        total_sales_dollars = target.total_sales_dollars + source.total_sales_dollars,
        total_discount_dollars = target.total_discount_dollars + source.total_discount_dollars
WHEN NOT MATCHED THEN
    INSERT (pos_site_id, sku_id, fscldt_id, price_substate_id, type, total_sales_units, total_sales_dollars, total_discount_dollars)
    VALUES (source.pos_site_id, source.sku_id, source.fscldt_id, source.price_substate_id, source.type, source.total_sales_units, source.total_sales_dollars, source.total_discount_dollars)
""")

Out[19]: DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]