In [0]:
#Profiled te sorce and found contamination (values were apprearing in the wrong columns) - here's version 2:

#Load bronze data
from pyspark.sql import functions as F
from pyspark.sql import Window

bronze = spark.table("genai_support_pipeline.bronze_support_tickets")
print("bronze_count:", bronze.count())

#Normalize obvious strings
df = bronze

string_cols = [
    "customer_name","customer_email","customer_gender","product_purchased",
    "ticket_type","ticket_subject","ticket_description","ticket_status",
    "ticket_priority","ticket_channel"
]

for c in string_cols:
    df = df.withColumn(c, F.trim(F.col(c)))

#Define "known-good" domains
VALID_STATUSES = ["Open", "Closed", "Pending Customer Response"]
VALID_PRIORITIES = ["Low", "Medium", "High", "Critical"]
VALID_CHANNELS = ["Email", "Phone", "Chat", "Social media", "Web"]
VALID_GENDERS = ["Male", "Female", "Other"]

#Salvage: move misplaced values into the right columns
df = df.withColumn(
    "ticket_status_fixed",
    F.when(F.col("ticket_status").isin(VALID_STATUSES), F.col("ticket_status"))
     .when(F.col("customer_gender").isin(VALID_STATUSES), F.col("customer_gender"))
     .otherwise(F.lit(None))
)

df = df.withColumn(
    "ticket_priority_fixed",
    F.when(F.col("ticket_priority").isin(VALID_PRIORITIES), F.col("ticket_priority"))
     .when(F.col("customer_gender").isin(VALID_PRIORITIES), F.col("customer_gender"))
     .when(F.col("ticket_status").isin(VALID_PRIORITIES), F.col("ticket_status"))
     .otherwise(F.lit(None))
)

df = df.withColumn(
    "ticket_channel_fixed",
    F.when(F.col("ticket_channel").isin(VALID_CHANNELS), F.col("ticket_channel"))
     .when(F.col("customer_gender").isin(VALID_CHANNELS), F.col("customer_gender"))
     .when(F.col("ticket_status").isin(VALID_CHANNELS), F.col("ticket_status"))
     .otherwise(F.lit(None))
)

df = df.withColumn(
    "customer_gender_fixed",
    F.when(F.col("customer_gender").isin(VALID_GENDERS), F.col("customer_gender"))
     .otherwise(F.lit(None))
)

#Define "minimum viable record" and quarantine the rest
def is_blank(c):
    return (F.col(c).isNull()) | (F.trim(F.col(c)) == "")

df = df.withColumn(
    "has_text",
    (~is_blank("ticket_subject")) | (~is_blank("ticket_description"))
).withColumn(
    "has_identifier",
    (F.col("ticket_id").isNotNull()) | (~is_blank("customer_email")) | (F.col("date_of_purchase").isNotNull())
)

df = df.withColumn(
    "reject_reason",
    F.when(F.col("has_text") == False, F.lit("missing_subject_and_description"))
     .when(F.col("has_identifier") == False, F.lit("missing_identifier"))
     .otherwise(F.lit(None))
)

df_clean = df.filter(F.col("reject_reason").isNull())
df_quarantine = df.filter(F.col("reject_reason").isNotNull())

print("clean:", df_clean.count())
print("quarantine:", df_quarantine.count())

#Add LLM-ready text field
df_clean = df_clean.withColumn(
    "text_for_llm",
    F.concat_ws("\n\n",
        F.concat(F.lit("Subject: "), F.coalesce(F.col("ticket_subject"), F.lit(""))),
        F.concat(F.lit("Description: "), F.coalesce(F.col("ticket_description"), F.lit("")))
    )
)

#Dedupe (ticket ID if present, else fallback)
df_clean = df_clean.withColumn(
    "ticket_key",
    F.when(F.col("ticket_id").isNotNull(), F.col("ticket_id").cast("string"))
     .otherwise(F.sha2(F.col("text_for_llm"), 256))
)

#dedupe by ticket_key
w = Window.partitionBy("ticket_key").orderBy(F.length("text_for_llm").desc())

df_clean = (
    df_clean.withColumn("rn", F.row_number().over(w))
            .filter(F.col("rn") == 1)
            .drop("rn")
)

#Write Silver Tables
(df_clean.write.format("delta").mode("overwrite")
 .saveAsTable("genai_support_pipeline.silver_support_tickets_clean"))

(df_quarantine.write.format("delta").mode("overwrite")
 .saveAsTable("genai_support_pipeline.silver_support_tickets_quarantine"))



bronze_count: 18762
clean: 10019
quarantine: 8743
