# **Parameters**

In [None]:
# Parameters (injected by pipeline)
table_name = "Customer"  # default for local testing

# **Imports & Configuration**

In [None]:
from pyspark.sql.functions import current_timestamp, col, trim, lower, initcap

# Bronze OneLake path (get from Bronze lakehouse)
bronze_base = "abfss://[WORKSPACE_ID]@onelake.dfs.fabric.microsoft.com/[BRONZE_LAKEHOUSE_ID]/Files"

print(f"üì• Processing table: {table_name}")
print(f"üîó Bronze path: {bronze_base}/{table_name}")

# **Read Bronze Data**

In [None]:
# Read Parquet from Bronze
bronze_path = f"{bronze_base}/{table_name}"

try:
    df = spark.read.parquet(bronze_path)
    print(f"‚úÖ Bronze data loaded: {df.count()} rows")
    df.show(5)
except Exception as e:
    print(f"‚ùå Error reading Bronze: {e}")
    raise

# **Apply Transformations**

In [None]:
# Basic cleaning - apply to all tables
df_clean = df.withColumn("load_timestamp", current_timestamp())

# Table-specific transformations
if table_name == "Customer":
    df_clean = df_clean \
        .withColumn("FirstName", initcap(trim(col("FirstName")))) \
        .withColumn("LastName", initcap(trim(col("LastName")))) \
        .withColumn("EmailAddress", lower(trim(col("EmailAddress"))))
    print("‚úÖ Names standardized (proper case), emails lowercase")

elif table_name == "Product":
    # Future: Add product-specific transformations
    pass

elif table_name == "SalesOrderHeader":
    # Future: Add date validation
    pass

# Deduplication (based on first column as business key)
business_key = df_clean.columns[0]
df_clean = df_clean.dropDuplicates([business_key])

print(f"‚úÖ Transformations applied: {df_clean.count()} rows after cleaning")

# **Write to Silver (Delta)**

In [None]:
# Silver table name
silver_table_name = f"{table_name.lower()}_silver"

try:
    # Write as Delta table
    df_clean.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable(silver_table_name)
    
    print(f"‚úÖ Silver table created: {silver_table_name}")
    print(f"üìä Final row count: {df_clean.count()}")
    
    # Verify table exists
    spark.sql(f"DESCRIBE EXTENDED {silver_table_name}").show(truncate=False)
    
except Exception as e:
    print(f"‚ùå Error writing to Silver: {e}")
    raise