In [None]:
from snowflake import snowpark_connect
from snowflake.snowpark.context import get_active_session
session = get_active_session()
print(session)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = snowpark_connect.server.init_spark_session()

from snowflake.snowpark_connect.resources_initializer import wait_for_resource_initialization
wait_for_resource_initialization()

In [None]:
raw_channel = spark.read.table("MAYBANK_DEMO.BRONZE.RAW_CHANNEL")
raw_channel.show()

df_raw_channel = raw_channel.withColumn("CHANNEL_CODE_CLEAN", upper(trim(col("CHANNEL_CODE"))))
    
# 2. Deduplicate: Group by Clean Code, take first non-null attributes
df_raw_channel = df_raw_channel.groupBy("CHANNEL_CODE_CLEAN").agg(
    first("CHANNEL_NAME").alias("CHANNEL_NAME"),
    first("CHANNEL_TYPE").alias("CHANNEL_TYPE"),
    first("DEVICE_TYPE").alias("DEVICE_TYPE"),
    first("LOCATION_TYPE").alias("LOCATION_TYPE"),
    first("IS_24X7_FLAG").alias("RAW_FLAG")
)

# 3. Standardize Flag ('Yes'/'1' -> 'Y')
df_raw_channel = df_raw_channel.withColumn("IS_24X7_FLAG", 
    when(upper(col("RAW_FLAG")).isin("Y", "YES", "TRUE", "1"), "Y").otherwise("N")
).drop("RAW_FLAG").withColumnRenamed("CHANNEL_CODE_CLEAN", "CHANNEL_CODE")

df_raw_channel.show()

df_raw_channel.write.mode("overwrite").saveAsTable("MAYBANK_DEMO.SILVER.dim_channel")

In [None]:
raw_behavior_event = spark.read.table("MAYBANK_DEMO.BRONZE.RAW_BEHAVIOR_EVENT")
raw_behavior_event.withColumn("CAMPAIGN_NAME", trim(regexp_replace(col("CAMPAIGN_NAME"), "Â®", "")))

raw_behavior_event.show()

# Cleaning 2: Normalize Device Type (Raw Agent String -> Category)
df_raw_behavior_event = raw_behavior_event.withColumn("DEVICE_TYPE", 
    when(lower(col("DEVICE_TYPE")).rlike("iphone|android|mobile"), "Mobile")
    .when(lower(col("DEVICE_TYPE")).rlike("windows|mac|desktop"), "Desktop")
    .otherwise(col("DEVICE_TYPE"))
)

# Cleaning 3: Title Case Traffic Source
df_raw_behavior_event = df_raw_behavior_event.withColumn("TRAFFIC_SOURCE", initcap(col("TRAFFIC_SOURCE")))

df_raw_behavior_event.select(
    "BEHAVIOR_EVENT_ID", "BEHAVIOR_EVENT_TYPE", "CHANNEL_CODE", 
    "CONTENT_ID", "CAMPAIGN_ID", "CAMPAIGN_NAME", 
    "DEVICE_TYPE", "TRAFFIC_SOURCE", "JOURNEY_STAGE"
)
df_raw_behavior_event.show()

df_raw_behavior_event.write.mode("overwrite").saveAsTable("MAYBANK_DEMO.SILVER.dim_behavior_event")

In [None]:
raw_behavior = spark.read.table("MAYBANK_DEMO.BRONZE.RAW_BEHAVIOR")

raw_behavior.show()

df_raw_behavior = raw_behavior.select(
        col("BEHAVIOR_RECORD_ID"),
        
        # Cleaning 1: Handle Null Customer (Tag as Anonymous)
        coalesce(col("CUSTOMER_ID"), lit("ANONYMOUS")).alias("CUSTOMER_ID"),
        
        # Cleaning 2: Standardize Join Key (Fixes 'evt_page_casa' mismatch)
        upper(trim(col("BEHAVIOR_EVENT_ID"))).alias("BEHAVIOR_EVENT_ID"),
        
        col("CHANNEL_CODE"),
        
        # Cleaning 3: Parse Chaotic Timestamps -> Date
        # Tries multiple patterns. If all fail, returns Null.
        coalesce(
            to_date(col("RAW_TIMESTAMP"), "yyyy-MM-dd'T'HH:mm:ssXXX"), # ISO Offset
            to_date(col("RAW_TIMESTAMP"), "yyyy-MM-dd'T'HH:mm:ss'Z'"), # ISO UTC
            to_date(col("RAW_TIMESTAMP"), "yyyy-MM-dd HH:mm:ss")       # SQL Standard
        ).alias("DATE_VALUE"),
        
        col("SESSION_ID"),
        col("EVENT_SEQUENCE_IN_SESSION"),
        col("EVENT_VALUE"),
        col("IS_CONVERSION_FLAG"),
        col("CONVERSION_TYPE"),
        
        # Cleaning 4: Cast Score to Float (Handles 'N/A', 'invalid', 'Error')
        # Casting a non-numeric string to float results in Null in Spark
        col("EXPERIENCE_SCORE").cast("float").alias("EXPERIENCE_SCORE")
    )
    
# Data Quality Filter: Drop rows where Date could not be parsed (Junk Data)
df_raw_behavior_clean = df_raw_behavior.filter(col("DATE_VALUE").isNotNull())

df_raw_behavior_clean.show()

df_raw_behavior_clean.write.mode("overwrite").saveAsTable("MAYBANK_DEMO.SILVER.fact_behavior")