### Import modules and assign pipeline parameters to variables

In [0]:
# Import modules
import dlt
from pyspark.sql.functions import col, when, coalesce, lit, concat

# Assign pipeline parameters to variables
catalog = spark.conf.get("catalog")
schema = spark.conf.get("schema")
volume = spark.conf.get("volume")
cleansed_table = spark.conf.get("cleansed_table") # cleansed_table is the staging table

volume_path = f"/Volumes/{catalog}/{schema}/{volume}/"

### Define a view to validate and cleanse the raw data

In [0]:
@dlt.view(
    comment="A view for stage consumer complaints"
)

# Data validations for staging table
@dlt.expect_or_drop("valid_complaint_id", "Complaint_ID IS NOT NULL AND Complaint_ID RLIKE '^[0-9]+$' AND length(Complaint_ID) = 7")
@dlt.expect_or_drop("valid_date_received", "Date_received >= Date_submitted")
@dlt.expect("valid_is_timely", "Is_timely IN ('Yes', 'No')")

# Function to create staging table
def stage_complaints():
    df = spark.read.csv(volume_path, header=True, inferSchema=True)
    
    df = df.withColumnRenamed("Complaint ID", "Complaint_ID") \
           .withColumnRenamed("Submitted via", "Submitted_via") \
           .withColumnRenamed("Date submitted", "Date_submitted") \
           .withColumnRenamed("Date received", "Date_received") \
           .withColumn(
               "Sub_product", 
               when(
                   col("Sub-product").isNull() | (col("Sub-product") == "I do not know"), 
                   concat(lit("Unknown for "), col("Product"))
                ).otherwise(col("Sub-product"))
            ) \
           .withColumn("Issue", coalesce(col("Issue"), lit("Unknown"))) \
           .withColumn("Sub_issue", coalesce(col("Sub-issue"), lit("Unknown"))) \
           .withColumn("Company_public_response", coalesce(col("Company public response"), lit("Unknown"))) \
           .withColumn("Company_response_to_consumer", coalesce(col("Company response to consumer"), lit("Unknown"))) \
           .withColumn("Is_timely", coalesce(col("Timely response?"), lit("Unknown")))

    return df

### Define a materialized view for date dimension

In [0]:
@dlt.table(
    comment="A materialized view for date dimension"
)

# Function to create a date dimension from the staging table
def dim_date():
    return spark.sql(
        f"""
        SELECT 
            Date_submitted AS Date, 
            YEAR(Date_submitted) AS Year,
            QUARTER(Date_submitted) AS Quarter,
            DATE_FORMAT(Date_submitted, 'MMM') AS Month, -- get month name in short form e.g. Jan
            DAY(Date_submitted) AS Day
        FROM LIVE.{cleansed_table}
        UNION
        SELECT 
            Date_received AS Date,
            YEAR(Date_received) AS Year,
            QUARTER(Date_received) AS Quarter,
            DATE_FORMAT(Date_received, 'MMM') AS Month,
            DAY(Date_received) AS Day
        FROM LIVE.{cleansed_table}
        """
    )

### Define a materialized view for product dimension

In [0]:
@dlt.table(
    comment="A materialized view for product dimension"
)

# Function to create a product dimension from the staging table
def dim_product():
    return spark.sql(
        f"""
        SELECT DISTINCT 
            Product, 
            Sub_product
        FROM LIVE.{cleansed_table}
        """
    )

### Define a materialized view for the fact table

In [0]:
@dlt.table(
    comment="A materialized view for the fact table (dropping state and product columns)"
)

# Function to create a fact table from the staging table
def fact_complaints():
    return spark.sql(
        f"""
        SELECT
            Complaint_ID,
            Submitted_via,
            Date_submitted,
            Date_received,
            Sub_product,
            Issue,
            Sub_issue,
            Company_public_response,
            Company_response_to_consumer,
            Is_timely
        FROM LIVE.{cleansed_table}
        """
    )