In [0]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

In [0]:
%run ../01_init/03_utilities

In [0]:
dbutils.widgets.text("catalog", "fmcg", "Field#1")
dbutils.widgets.text("data_source", "orders", "Field#2")

catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")

In [0]:
base_path = f's3://xxxxx/{data_source}'
landing_path = f"{base_path}/landing/"
processed_path = f"{base_path}/processed/"

print("Base Path: ", base_path)
print("Landing Path: ", landing_path)
print("Processed Path: ", processed_path)

bronze_table = f"{catalog}.{bronze_schema}.{data_source}"
silver_table = f"{catalog}.{silver_schema}.{data_source}"
gold_table = f"{catalog}.{gold_schema}.sb_fact_{data_source}"

In [0]:
df = (
  spark.read
  .options(header = True, inferSchema = True)
  .csv(f"{landing_path}/*.csv")
  .withColumn("read_timestamp", F.current_timestamp())
  .select("*", "_metadata.file_name", "_metadata.file_size")
)

In [0]:
df.write \
    .format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .mode("append") \
    .saveAsTable(bronze_table)

In [0]:
df.write \
    .format("delta") \
    .option("detla.enableChangeDataFeed", "true") \
    .mode("append") \
    .saveAsTable(f"{catalog}.{bronze_schema}.staging_{data_source}")

In [0]:
files = dbutils.fs.ls(landing_path)

In [0]:
for landing_files in files:
    dbutils.fs.mv (
        landing_files.path,
        f"{processed_path}/{landing_files.name}",
        True
    )

In [0]:

df_orders = spark.sql(
    f"""
    SELECT * FROM {catalog}.{bronze_schema}.staging_{data_source}
    """
)

In [0]:
df_orders = df_orders.filter(F.col("order_qty").isNotNull())

df_orders = df_orders.withColumn(
    "customer_id",
    F.when(F.col("customer_id").rlike("^[0-9]+$"), F.col("customer_id"))
     .otherwise("999999")
     .cast("string")
)

df_orders = df_orders.withColumn(
    "order_placement_date",
    F.regexp_replace(F.col("order_placement_date"), r"^[A-Za-z]+,\s*", "")
)

df_orders = df_orders.withColumn(
    "order_placement_date",
    F.coalesce(
        F.try_to_date("order_placement_date", "yyyy/MM/dd"),
        F.try_to_date("order_placement_date", "dd-MM-yyyy"),
        F.try_to_date("order_placement_date", "dd/MM/yyyy"),
        F.try_to_date("order_placement_date", "MMMM dd, yyyy"),
    )
)

df_orders = df_orders.dropDuplicates(["order_id", "order_placement_date", "customer_id", "product_id", "order_qty"])

df_orders = df_orders.withColumn('product_id', F.col('product_id').cast('string'))

In [0]:
df_products = spark.table("fmcg.silver.products") 

df_joined = (
    df_orders
    .join(
        df_products, 
        on="product_id", 
        how="inner")
    .select(df_orders["*"], df_products["product_code"])
)

In [0]:
if not spark.catalog.tableExists(silver_table):
    (
        df_joined.write.format("delta")
            .option("delta.enableChangeDataFeed", "true")
            .option("mergeSchema", "true")
            .mode("overwrite").saveAsTable(silver_table)
    )

else:
    silver_delta = DeltaTable.forName(spark, silver_table)
    
    param = """
    silver.order_placement_date = bronze.order_placement_date AND 
    silver.order_id = bronze.order_id AND 
    silver.product_code = bronze.product_code AND 
    silver.customer_id = bronze.customer_id
    """
    
    (
        silver_delta.alias("silver")
            .merge(df_joined.alias("bronze"), param)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
    )

In [0]:
df_joined.write\
 .format("delta") \
 .option("delta.enableChangeDataFeed", "true") \
 .mode("overwrite") \
 .saveAsTable(f"{catalog}.{silver_schema}.staging_{data_source}")

In [0]:
df_gold = spark.sql(f"SELECT * FROM {catalog}.{silver_schema}.staging_{data_source}")

In [0]:
df_gold = spark.sql(
    f"""
    SELECT 
        order_id,
        order_placement_date AS date,
        customer_id AS customer_code,
        product_code,
        product_id,
        order_qty AS sold_quantity
    FROM {catalog}.{silver_schema}.staging_{data_source}
    """
)

In [0]:
if not spark.catalog.tableExists(gold_table):
    (
        df_gold.write.format("delta")
            .option("delta.enableChangeDataFeed", "true")
            .option("mergeSchema", "true")
            .mode("overwrite")
            .saveAsTable(gold_table)
    )
else:
    gold_delta = DeltaTable.forName(spark, gold_table)

    param = """
        target.order_id      = source.order_id AND
        target.date          = source.date AND
        target.customer_code = source.customer_code AND
        target.product_code  = source.product_code
    """

    (
        gold_delta.alias("target")
            .merge(df_gold.alias("source"), param)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
    )

In [0]:

df_child = spark.sql(
    f"""
    SELECT
        order_placement_DATE AS date
    FROM {catalog}.{silver_schema}.staging_{data_source}
    """
)

incremental_month_df = df_child.select(
    F.trunc("date", "MM").alias("order_month")
)

incremental_month_df.createOrReplaceTempView("incremental_months")

In [0]:
monthly_table = spark.sql(
    f"""
    SELECT
        date,
        product_code,
        customer_code,
        sold_quantity
    FROM {catalog}.{gold_schema}.sb_fact_orders AS sbf
    INNER JOIN
        incremental_months AS im
        ON trunc(sbf.date, Â 'MM') = im.order_month
    """
)

In [0]:
df_monthly_agg = (
    monthly_table
    .withColumn("order_month", F.trunc("date", "MM"))
    .groupBy("order_month", "product_code", "customer_code")
    .agg(F.sum("sold_quantity").alias("sold_quantity"))
    .withColumnRenamed("order_month", "date")
)

In [0]:
gold_parent_delta = DeltaTable.forName(spark, f"{catalog}.{gold_schema}.fact_orders")
    
param = """
    parent_gold.date = child_gold.date AND
    parent_gold.product_code = child_gold.product_code AND
    parent_gold.customer_code = child_gold.customer_code
"""
(
    gold_parent_delta.alias("parent_gold")
        .merge(df_monthly_agg.alias("child_gold"), param)
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
)

In [0]:
%sql
DROP TABLE fmcg.bronze.staging_orders;

In [0]:
%sql
DROP TABLE fmcg.silver.staging_orders;