In [4]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.types import *
import datetime

StatementMeta(, 57db7538-4fb3-49f0-83ee-5538bd5f2ec7, 6, Finished, Available, Finished)

In [5]:
# Global configuration
UPDATED = datetime.datetime.today().replace(second=0, microsecond=0)

StatementMeta(, 57db7538-4fb3-49f0-83ee-5538bd5f2ec7, 7, Finished, Available, Finished)

In [6]:
# Paths and table names for Customers
CUSTOMER_GOLD_TABLE_PATH = "Tables/dim_customer"  # Gold table location for customers
CUSTOMER_SILVER_TABLE_NAME = "customers"           # Silver table name for customers

# Define the schema for the Customers Gold table
customer_gold_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("updated_at", TimestampType(), True)
])


# Paths and table names for Products
PRODUCT_GOLD_TABLE_PATH = "Tables/dim_product"  # Gold table location for products
PRODUCT_SILVER_TABLE_NAME = "products"           # Silver table name for products

# Define the schema for the Products Gold table
product_gold_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("cost", LongType(), True),
    StructField("original_sale_price", LongType(), True),
    StructField("discount", LongType(), True),
    StructField("current_price", LongType(), True),
    StructField("taxes", DoubleType(), True),
    StructField("updated_at", TimestampType(), True)
])

# Paths and table names for Locations
LOCATION_GOLD_TABLE_PATH = "Tables/dim_location"  # Gold table location for locations
LOCATION_SILVER_TABLE_NAME = "locations"           # Silver table name for locations

# Define the schema for the Locations Gold table
location_gold_schema = StructType([
    StructField("location_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("county", StringType(), True),
    StructField("state_code", StringType(), True),
    StructField("state", StringType(), True),
    StructField("type", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("updated_at", TimestampType(), True)
])


def create_blank_df(spark, schema):
    """Creates a blank DataFrame using the provided schema."""
    return spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)


def process_dimension(spark, gold_table_path, silver_table_name, gold_schema, 
                      select_exprs, key, additional_data, table_name):
    """
    Processes a dimension table:
      - Reads the current watermark from the gold table (based on updated_at).
      - Creates the gold table if it doesn't exist.
      - Reads new records from the silver table (filtering by updated_at).
      - Selects the needed columns (using select_exprs).
      - Creates additional rows (using additional_data).
      - Unions the new records with additional rows.
      - Merges the unioned data into the gold Delta table.
      
    Parameters:
      spark: SparkSession.
      gold_table_path: Path to the gold table.
      silver_table_name: Silver table name in the metastore.
      gold_schema: Schema of the gold table.
      select_exprs: List of column expressions (or column names) to select from silver table.
      key: The key column used for the merge.
      additional_data: List of tuples containing additional row data (must match gold_schema).
      table_name: The name used when creating the table if it doesn't exist.
    """
    try:
        gold_delta = DeltaTable.forPath(spark, gold_table_path)
        watermark_row = gold_delta.toDF().agg(F.max("updated_at").alias("max_updated_at")).collect()[0]
        watermark = watermark_row["max_updated_at"]
        print(f"[{table_name}] Current watermark: {watermark}")
    except Exception as e:
        print(f"[{table_name}] Gold table not found; initializing watermark. Exception: {e}")
        watermark = datetime.datetime(1900, 1, 1)
        blank_df = create_blank_df(spark, gold_schema)
        blank_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
        gold_delta = DeltaTable.forPath(spark, gold_table_path)
    
    df_silver = spark.table(silver_table_name)
    df_new = df_silver.filter(F.col("updated_at") > F.lit(watermark))
    df_new_sel = df_new.select(*select_exprs)
    
    # Create additional rows DataFrame
    df_additional = spark.createDataFrame(additional_data, gold_schema)
    
    df_union = df_new_sel.unionByName(df_additional)
    new_count = df_union.count()
    print(f"[{table_name}] New records to merge (including additional rows): {new_count}")
    
    if new_count > 0:
        merge_condition = f"tgt.{key} = src.{key}"
        try:
            gold_delta.alias("tgt").merge(
                df_union.alias("src"),
                merge_condition
            ).whenMatchedUpdateAll() \
             .whenNotMatchedInsertAll() \
             .execute()
            print(f"[{table_name}] Merge complete.")
        except Exception as merge_error:
            print(f"[{table_name}] Error during merge: {merge_error}")
    else:
        print(f"[{table_name}] No new records to merge.")


process_dimension(
    spark,
    gold_table_path=CUSTOMER_GOLD_TABLE_PATH,
    silver_table_name=CUSTOMER_SILVER_TABLE_NAME,
    gold_schema=customer_gold_schema,
    select_exprs=["customer_id", "customer_name", "updated_at"],
    key="customer_id",
    additional_data=[("C0", "Undefined", UPDATED), ("C-1", "Invalid", UPDATED)],
    table_name="dim_customer"
)


process_dimension(
    spark,
    gold_table_path=PRODUCT_GOLD_TABLE_PATH,
    silver_table_name=PRODUCT_SILVER_TABLE_NAME,
    gold_schema=product_gold_schema,
    select_exprs=["product_id", "product_name", "cost", "original_sale_price", 
                  "discount", "current_price", "taxes", "updated_at"],
    key="product_id",
    additional_data=[("P0", "Undefined", 0, 0, 0, 0, 0.0, UPDATED),
                     ("P-1", "Invalid", 0, 0, 0, 0, 0.0, UPDATED)],
    table_name="dim_product"
)


process_dimension(
    spark,
    gold_table_path=LOCATION_GOLD_TABLE_PATH,
    silver_table_name=LOCATION_SILVER_TABLE_NAME,
    gold_schema=location_gold_schema,
    select_exprs=["location_id", "name", "county", "state_code", "state", "type",
                  "latitude", "longitude", "updated_at"],
    key="location_id",
    additional_data=[("L0", "Undefined", "Undefined", "Undefined", "Undefined", "Undefined", 0.0, 0.0, UPDATED),
                     ("L-1", "Invalid", "Invalid", "Invalid", "Invalid", "Invalid", 0.0, 0.0, UPDATED)],
    table_name="dim_location"
)

StatementMeta(, 57db7538-4fb3-49f0-83ee-5538bd5f2ec7, 8, Finished, Available, Finished)

[dim_customer] Gold table not found; initializing watermark. Exception: `Tables/dim_customer` is not a Delta table.
[dim_customer] New records to merge (including additional rows): 803
[dim_customer] Merge complete.
[dim_product] Gold table not found; initializing watermark. Exception: `Tables/dim_product` is not a Delta table.
[dim_product] New records to merge (including additional rows): 103
[dim_product] Merge complete.
[dim_location] Gold table not found; initializing watermark. Exception: `Tables/dim_location` is not a Delta table.
[dim_location] New records to merge (including additional rows): 76
[dim_location] Merge complete.


In [11]:
# Define start and end dates
start_date = "2010-01-01"
end_date = "2030-12-31"

# Fiscal year start month (e.g., 7 for July)
_startOfFiscalYear = 7

# Create a DataFrame with a sequence of dates between start_date and end_date
dim_date = spark.createDataFrame([("dummy",)], ["dummy"]) \
    .select(F.sequence(F.to_date(F.lit(start_date)), F.to_date(F.lit(end_date))).alias("DateArr")) \
    .selectExpr("explode(DateArr) as Date")

# Build the date dimension with additional columns
dim_date = dim_date.withColumn("Year", F.year("Date")) \
    .withColumn("Start of Year", F.expr("make_date(year(Date), 1, 1)")) \
    .withColumn("End of Year", F.expr("make_date(year(Date), 12, 31)")) \
    .withColumn("Month", F.month("Date")) \
    .withColumn("Start of Month", F.expr("make_date(year(Date), month(Date), 1)")) \
    .withColumn("End of Month", F.last_day("Date")) \
    .withColumn("Days in Month", F.dayofmonth(F.last_day("Date"))) \
    .withColumn("Year Month Number", F.expr("cast(date_format(Date, 'yyyyMM') as int)")) \
    .withColumn("Year Month Name", F.date_format("Date", "yyyy-MMM")) \
    .withColumn("Day", F.dayofmonth("Date")) \
    .withColumn("Day Name", F.date_format("Date", "EEEE")) \
    .withColumn("Day Name Short", F.date_format("Date", "EEE")) \
    .withColumn("Day of Week", F.dayofweek("Date")) \
    .withColumn("Day of Year", F.dayofyear("Date")) \
    .withColumn("Month Name", F.date_format("Date", "MMMM")) \
    .withColumn("Month Name Short", F.date_format("Date", "MMM")) \
    .withColumn("Quarter", F.quarter("Date")) \
    .withColumn("Quarter Name", F.concat(F.lit("Q"), F.quarter("Date"))) \
    .withColumn("Year Quarter Number", F.expr("cast(date_format(Date, 'yyyy') || date_format(Date, 'Q') as int)")) \
    .withColumn("Year Quarter Name", F.concat(F.year("Date"), F.lit(" Q"), F.quarter("Date"))) \
    .withColumn("Start of Quarter", F.expr("make_date(year(Date), ((quarter(Date)-1)*3)+1, 1)")) \
    .withColumn("End of Quarter", F.expr("date_sub(add_months(make_date(year(Date), ((quarter(Date)-1)*3)+1, 1), 3), 1)")) \
    .withColumn("Week of Year", F.weekofyear("Date")) \
    .withColumn("Start of Week", F.expr("date_sub(Date, dayofweek(Date)-1)")) \
    .withColumn("End of Week", F.expr("date_add(date_sub(Date, dayofweek(Date)-1), 6)")) \
    .withColumn("Fiscal Year", 
                F.expr(f"case when month(Date) >= {_startOfFiscalYear} then year(Date)+1 else year(Date) end")) \
    .withColumn("Fiscal Month", (((F.month("Date") - _startOfFiscalYear + 12) % 12) + 1)) \
    .withColumn("Fiscal Quarter", 
                F.expr(f"ceil((((month(Date) - {_startOfFiscalYear} + 12) % 12) + 1)/3.0)")) \
    .withColumn("Day Offset", F.datediff(F.current_date(), "Date")) \
    .withColumn("Month Offset", F.months_between(F.current_date(), "Date").cast("int")) \
    .withColumn("Quarter Offset", (F.months_between(F.current_date(), "Date")/3).cast("int")) \
    .withColumn("Year Offset", F.year(F.current_date()) - F.year("Date")) \
    .withColumn("Year-month", F.date_format("Date", "yyyy-MM"))

# Write the dim_date DataFrame as a Delta table with column mapping enabled.
dim_date.write.format("delta") \
    .option("delta.columnMapping.mode", "name") \
    .option("delta.minReaderVersion", "2") \
    .option("delta.minWriterVersion", "5") \
    .mode("overwrite") \
    .saveAsTable("dim_date")

StatementMeta(, 57db7538-4fb3-49f0-83ee-5538bd5f2ec7, 13, Finished, Available, Finished)

In [12]:
FACT_ORDER_GOLD_TABLE_PATH = "Tables/fact_order"        # Gold table location for fact_order
FACT_ORDER_SILVER_TABLE_NAME = "sales"                   # Silver table name for sales data

# Define the schema for the fact_order table.
# Note: An extra "updated_at" field is added for incremental loading.
fact_order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("location_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("quantity", LongType(), True),
    StructField("price", LongType(), True),
    StructField("updated_at", TimestampType(), True)
])

# 1. Determine the current watermark from the fact_order (Gold) table.
try:
    fact_order_delta = DeltaTable.forPath(spark, FACT_ORDER_GOLD_TABLE_PATH)
    watermark_row = fact_order_delta.toDF().agg(F.max("updated_at").alias("max_updated_at")).collect()[0]
    watermark = watermark_row["max_updated_at"]
    print(f"[Fact Order] Current watermark: {watermark}")
except Exception as e:
    print(f"[Fact Order] Table not found; initializing watermark. Exception: {e}")
    watermark = datetime.datetime(1900, 1, 1)
    blank_df = create_blank_df(spark, fact_order_schema)
    blank_df.write.format("delta").mode("overwrite").saveAsTable("fact_order")
    fact_order_delta = DeltaTable.forPath(spark, FACT_ORDER_GOLD_TABLE_PATH)

# 2. Read only new records from the Silver sales table using the watermark.
df_silver = spark.table(FACT_ORDER_SILVER_TABLE_NAME)
# Assume the silver table has an "updated_date" column for incremental processing.
df_new = df_silver.filter(F.col("updated_date") > F.lit(watermark))

# 3. Coalesce key columns to provide default IDs if null.
#    Also, move "updated_date" to "updated_at" for our merge and incremental load.
df_new = df_new.withColumn("product_id", F.coalesce(F.col("product_id"), F.lit("P0"))) \
               .withColumn("location_id", F.coalesce(F.col("location_id"), F.lit("L0"))) \
               .withColumn("customer_id", F.coalesce(F.col("customer_id"), F.lit("C0"))) \
               .withColumn("updated_at", F.col("updated_date"))

# 4. Select the necessary columns in the order defined by our schema.
df_new_sel = df_new.select("order_id", "product_id", "location_id", "customer_id",
                           "order_date", "quantity", "price", "updated_at")

# Check if there are new records to merge.
new_count = df_new_sel.count()
print(f"[Fact Order] New records to merge: {new_count}")

# 5. Merge (upsert) the new records into the fact_order table.
if new_count > 0:
    merge_condition = "tgt.order_id = src.order_id"
    try:
        fact_order_delta.alias("tgt").merge(
            df_new_sel.alias("src"),
            merge_condition
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
        print("[Fact Order] Merge complete.")
    except Exception as merge_error:
        print(f"[Fact Order] Error during merge: {merge_error}")
else:
    print("[Fact Order] No new records to merge.")

StatementMeta(, 57db7538-4fb3-49f0-83ee-5538bd5f2ec7, 14, Finished, Available, Finished)

[Fact Order] Table not found; initializing watermark. Exception: `Tables/fact_order` is not a Delta table.
[Fact Order] New records to merge: 10889
[Fact Order] Merge complete.
