In [1]:
from pyspark.sql import functions as F

# 1. Read the three Delta tables from Lakehouse Tables folder
chat_sdf    = spark.read.format("delta").load("Tables/Silver/Customer/chat")
email_sdf   = spark.read.format("delta").load("Tables/Silver/Customer/email")
tickets_sdf = spark.read.format("delta").load("Tables/Silver/Customer/tickets")

# 2. Normalize each source to a common schema
# Chat
chat_norm = (
    chat_sdf
    .withColumn("Customer_ID", F.sha1(F.lower(F.col("sender_email"))).substr(1, 3))  # short hash
    .withColumn("Customer_ID", F.concat(F.lit("CUST"), F.upper(F.col("Customer_ID"))))
    .withColumn("Name", F.col("sender_name"))
    .withColumn("Email", F.col("sender_email"))
    .withColumn("Interaction_Type", F.lit("Chat"))
    .withColumn("Issue_Category", F.lit(None).cast("string"))
    .withColumn("Interaction_Date", F.to_date("timestamp_utc"))
    .withColumn("Customer_Name", F.col("customer_org"))
    .withColumn("Complaint_Description", F.col("message_text"))
    .select("Customer_ID","Name","Email","Interaction_Type","Issue_Category","Interaction_Date","Customer_Name","Complaint_Description")
)

# Email
email_norm = (
    email_sdf
    .withColumn("Customer_ID", F.sha1(F.lower(F.col("from_email"))).substr(1, 3))
    .withColumn("Customer_ID", F.concat(F.lit("CUST"), F.upper(F.col("Customer_ID"))))
    .withColumn("Name", F.col("from_name"))
    .withColumn("Email", F.col("from_email"))
    .withColumn("Interaction_Type", F.lit("Email"))
    .withColumn("Issue_Category", F.lit(None).cast("string"))
    .withColumn("Interaction_Date", F.to_date("received_utc"))
    .withColumn("Customer_Name", F.col("customer_org"))
    .withColumn("Complaint_Description", F.col("body_text"))
    .select("Customer_ID","Name","Email","Interaction_Type","Issue_Category","Interaction_Date","Customer_Name","Complaint_Description")
)

# Tickets
tickets_norm = (
    tickets_sdf
    .withColumn("Customer_ID", F.when(F.col("customer_id").isNotNull(), F.col("customer_id"))
                                .otherwise(F.concat(F.lit("CUST"), F.upper(F.sha1(F.lower(F.col("contact_email"))).substr(1, 3)))))
    .withColumn("Name", F.col("contact_name"))
    .withColumn("Email", F.col("contact_email"))
    .withColumn("Interaction_Type", F.lit("Ticket"))
    .withColumn("Issue_Category", F.col("category"))
    .withColumn("Interaction_Date", F.to_date("opened_utc"))
    .withColumn("Customer_Name", F.col("account_name"))
    .withColumn("Complaint_Description", F.col("description"))
    .select("Customer_ID","Name","Email","Interaction_Type","Issue_Category","Interaction_Date","Customer_Name","Complaint_Description")
)

# 3. Merge all into one DataFrame
final_sdf = chat_norm.unionByName(email_norm).unionByName(tickets_norm)

# 4. Optional: classify Issue_Category for Chat and Email using simple rules
def classify_issue(text):
    t = (text or "").lower()
    if any(k in t for k in ["invoice","refund","billing","payment"]): return "Billing"
    if any(k in t for k in ["login","connect","server","performance","error","feature not working"]): return "Technical"
    if any(k in t for k in ["request","feedback","feature request","interface"]): return "Feedback"
    return "Other"

# Register UDF
from pyspark.sql.types import StringType
classify_udf = F.udf(classify_issue, StringType())

final_sdf = final_sdf.withColumn(
    "Issue_Category",
    F.when(F.col("Issue_Category").isNotNull(), F.col("Issue_Category"))
     .otherwise(classify_udf(F.col("Complaint_Description")))
)

# 5. Write the consolidated Delta table
final_sdf.write.format("delta").mode("overwrite").option("overwriteSchema","true").save("Tables/Gold/customerdata")

print("✅ Created consolidated Delta table: Tables/Gold/customerdata")




StatementMeta(, adb08e27-253f-4323-b6bd-0eee8e774102, 3, Finished, Available, Finished)

✅ Created consolidated Delta table: Tables/Gold/customerdata


In [5]:
# =========================
# Silver -> Gold (Fabric)
# =========================

from pyspark.sql import functions as F

# ---- CONFIG: adjust paths as needed ----
silver_base      = "Tables/Silver/Billing"          # expects: invoices, customers, products
gold_base        = "Tables/Gold"            # will write: Gold/invoices (typed)

# ---- 1) Read Silver tables ----
inv = spark.read.format("delta").load(f"{silver_base}/invoices")
cus = spark.read.format("delta").load(f"{silver_base}/customers")
prd = spark.read.format("delta").load(f"{silver_base}/products")

# Quick sanity (optional)
# print(inv.columns); print(cus.columns); print(prd.columns)

# ---- 2) Join Silver tables ----
joined = (
    inv.alias("i")
      .join(cus.alias("c"), F.col("i.CustomerID") == F.col("c.CustomerID"), "left")
      .join(prd.alias("p"), F.col("i.ProductCode") == F.col("p.ProductCode"), "left")
)

# ---- 3) Derive business fields (typed Gold) ----
gold_typed_df = (
    joined
    .withColumn(
        "billing_period",
        F.concat_ws(
            " to ",
            F.date_format(F.col("i.UsageStart"), "yyyy-MM-dd"),
            F.date_format(F.col("i.UsageEnd"),   "yyyy-MM-dd")
        )
    )
    .select(
        F.col("i.InvoiceID").alias("invoice_id"),
        F.col("i.CustomerID").alias("customer_id"),
        F.col("c.Name").alias("name"),
        F.col("c.Email").alias("email"),
        F.col("billing_period"),
        F.col("i.InvoiceDate").alias("invoice_date"),
        F.col("i.TotalAmount").alias("total_bill_amount"),
        F.col("i.Currency").alias("currency"),
        F.col("p.ProductName").alias("high_cost_product"),
        F.col("i.HighCostProductPrice").alias("high_cost_product_price"),
        F.col("i.PaymentStatus").alias("payment_status"),
        F.col("i.PaymentMethod").alias("payment_method"),
        F.col("i.DueDate").alias("due_date"),
        F.col("i.LateFeeApplied").alias("late_fee_applied"),
        F.col("i.DiscountApplied").alias("discount_applied"),
        F.col("c.CustomerName").alias("customer_name")
    )
)

# ---- 4) Write typed Gold Delta (safe snake_case names -> valid Delta schema) ----
gold_typed_path = f"{gold_base}/billingdata"

(gold_typed_df.write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .save(gold_typed_path))

print(f"✅ GOLD (typed) written to: {gold_typed_path}")
spark.read.format("delta").load(gold_typed_path).show(5, truncate=False)



StatementMeta(, 20202d63-f37b-4d90-9486-f7390ab76fd7, 7, Finished, Available, Finished)

✅ GOLD (typed) written to: Tables/Gold/billingdata
+-------------+-----------+---------------+------------------------+------------------------+------------+-----------------+--------+-----------------------+-----------------------+--------------+--------------+----------+----------------+----------------+---------------+
|invoice_id   |customer_id|name           |email                   |billing_period          |invoice_date|total_bill_amount|currency|high_cost_product      |high_cost_product_price|payment_status|payment_method|due_date  |late_fee_applied|discount_applied|customer_name  |
+-------------+-----------+---------------+------------------------+------------------------+------------+-----------------+--------+-----------------------+-----------------------+--------------+--------------+----------+----------------+----------------+---------------+
|INV-2025-0004|CUST004    |Dustin Jones   |morrisstacey@example.com|2025-08-01 to 2025-08-31|2025-09-02  |780.00           |USD   