In [0]:
%run ../EcommerceProject/raw_ingestion

# Enrich Customers
## Deduplicate Records, Flag Missing/Invalid PII, Derive New Columns For Analytics, SCD2

In [0]:
logger.info("Transforming/Enriching Customers")
customers_enriched= spark.read.table("ecommerceproject.default.raw_customers")\
    .withColumn("is_phone_valid",when(col("phone").isNull(), lit(False))
            .when(col("phone").rlike("#ERROR!"), lit(False))
            .when(length(regexp_replace(col("phone"), r"\D", "")) < 10,lit(False))
            .otherwise(lit(True)))\
    .withColumn("customer_name",initcap(
        regexp_replace(
            trim(regexp_replace("customer_name", "[^A-Za-z ]", "")),
            " +",
            " "
        )
    )) \
    .withColumn("business_type",when(trim(initcap(col("segment")))=="Consumer","B2C")
                .otherwise("B2B")) \
    .withColumn(
        "has_complete_address",
        col("address").isNotNull() &
        col("city").isNotNull() &
        col("state").isNotNull() &
        col("postal_code").isNotNull()
    ) \
    .withColumn("address",concat_ws(", ", "address", "city", "state", "postal_code")) \
    .select("customer_id", "customer_name","segment", "country", "address", "region", "is_phone_valid", "has_complete_address", "business_type")
             
   

# Customers Dimension

In [0]:

logger.info("Creating Customers dimension") 
customers_enriched = (
    customers_enriched
    .transform(lambda df: add_hash(df, customers_enriched.columns))
    .withColumn("effective_start_date", current_date())
    .withColumn("effective_end_date", lit(None).cast("date"))
    .withColumn("is_current", lit(True))
)
target_table = "ecommerceproject.enriched.dim_customers"
if table_exists(target_table):
    target = DeltaTable.forName(spark, target_table)
    try:
        (
            target.alias("t")
            .merge(
                customers_enriched.alias("s"),
                """
                t.customer_id = s.customer_id
                AND t.is_current = true
                """
            )
            .whenMatchedUpdate(
                condition= "t.record_hash <> s.record_hash",
                set={
                    "effective_end_date": "current_date()",
                    "is_current": "false"
                }
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
    except Exception:
        logger.error("SCD merge into dim_customers failed", exc_info=True)
        raise
else:
    try:
        customers_enriched.write \
            .format("delta") \
            .option("mergeSchema", "true") \
            .saveAsTable(target_table)
    except Exception as e:
        logger.error("Failed to write dim_customers table", exc_info=True)
        raise
logger.info(f"Customers enriched and upserted: {customers_enriched.count()}") 

# Enrich Products

In [0]:
logger.info("Transforming/Enriching Products")
products_enriched = (
    spark.read.table("ecommerceproject.default.raw_products")\
    .withColumn(
        "product_name",
         initcap(regexp_replace(trim(regexp_replace("product_name", "[^A-Za-z0-9 ]", ""))," +"," ")))\
    .withColumn("unit_price", round(col("price_per_product").cast("decimal(10,2)"), 2)) \
    .withColumn(
        "price_tier",
          when(col("unit_price").isNull(), "Unknown")
         .when(col("unit_price") < 20, "Low")
         .when(col("unit_price").between(20, 100), "Medium")
         .when(col("unit_price").between(100, 300), "High")
         .otherwise("Premium")
    )\
    .withColumnRenamed("state", "selling_state")
    
)


# Products Dimension

In [0]:
logger.info("Creating Products dimension") 
products_enriched = (
    products_enriched
    .transform(lambda df: add_hash(df, products_enriched.columns))
    .withColumn("effective_start_date", current_date())
    .withColumn("effective_end_date", lit(None).cast("date"))
    .withColumn("is_current", lit(True))
)
target_table = "ecommerceproject.enriched.dim_products"
if table_exists(target_table):
    target = DeltaTable.forName(spark, target_table)
    try:
        (
            target.alias("t")
            .merge(
                products_enriched.alias("s"),
                """
                t.product_id = s.product_id
                AND t.is_current = true
                """
            )
            .whenMatchedUpdate(
                condition= "t.record_hash <> s.record_hash",
                set={
                    "effective_end_date": "current_date()",
                    "is_current": "false"
                }
            )
            .whenNotMatchedInsertAll()
            .execute()
        )
    except Exception:
        logger.error("SCD merge into dim_products failed", exc_info=True)
        raise
else:
    try:
        products_enriched.write \
            .format("delta") \
            .option("mergeSchema", "true") \
            .saveAsTable(target_table)
    except Exception as e:
        logger.error("Failed to write dim_products table", exc_info=True)
        raise
logger.info(f"Product enriched and upserted: {products_enriched.count()}") 
