In [0]:
# Databricks notebook parameters - these will be passed from ADF
dbutils.widgets.text("user_id", "")
dbutils.widgets.text("parent_job_id", "")
entity_type =  "subcategory"  # Default to subcategory for this notebook

# Get parameters
user_id = dbutils.widgets.get("user_id")
parent_job_id = dbutils.widgets.get("parent_job_id")



# Validate required parameters
if not user_id or not parent_job_id:
    raise ValueError("user_id and parent_job_id are required parameters")

import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def get_optimal_partition_col(df, candidates):
    """
    Selects the best partition column based on cardinality.
    Dynamically calculates target partitions based on data size for optimal performance.
    """
    num_rows = df.count()
    target_partitions = max(4, min(50, num_rows // 20000))  # Adjust divisor as needed based on row size
    
    best_col = None
    best_diff = float('inf')    
    for col in candidates:
        if col not in df.columns:
            continue
            
        cardinality = df.select(col).distinct().count()        
        if 1 < cardinality <= 100:
            diff = abs(cardinality - target_partitions)
            if diff < best_diff:
                best_diff = diff
                best_col = col
                
    return best_col




# COMMAND ----------
from pyspark.sql.functions import col, current_timestamp, lit, lower, trim, regexp_replace, to_timestamp, when

# Dynamic configuration based on parameters
ADLS_ACCOUNT_NAME = "shanleestorage"  # Your storage account name

# Bronze layer paths (source)
RAW_CONTAINER = "shanlee-raw-data"
RAW_DATA_PATH = f"{user_id}/{parent_job_id}"  # Dynamic path based on user/batch
RAW_FULL_PATH = f"abfss://{RAW_CONTAINER}@{ADLS_ACCOUNT_NAME}.dfs.core.windows.net/{RAW_DATA_PATH}"

# Silver layer paths (destination)
SILVER_CONTAINER = "shanlee-cleaned-data"
SILVER_PATH = f"temp_spark/{user_id}/{parent_job_id}/{entity_type}"
SILVER_FULL_PATH = f"abfss://{SILVER_CONTAINER}@{ADLS_ACCOUNT_NAME}.dfs.core.windows.net/{SILVER_PATH}"

print(f"Reading from: {RAW_FULL_PATH}")
print(f"Writing to: {SILVER_FULL_PATH}")

# Authentication (same as before)
SECRET_SCOPE = "AdlsAccessKey"    
SECRET_KEY = "AdlsAccessKey"

try:
    access_key_value = dbutils.secrets.get(scope=SECRET_SCOPE, key=SECRET_KEY)
    
    spark.conf.set(
        f"fs.azure.account.key.{ADLS_ACCOUNT_NAME}.dfs.core.windows.net",
        access_key_value
    )
    
    print("Authentication successful: Spark configured to access ADLS Gen2.")

except Exception as e:
    print(f"FATAL ERROR: Could not retrieve secret. Check scope/key names. Error: {e}")
    dbutils.notebook.exit("Authentication Failed")

# Read raw data from Bronze layer
df_raw = spark.read.format("json") \
             .option("multiline", "true") \
             .load(RAW_FULL_PATH)
subcategory_table = df_raw.select("subcategory.*")

# Global encoding cleanup: Remove non-printable characters from all string columns
for column in subcategory_table.columns:
    if dict(subcategory_table.dtypes)[column] == 'string':
        subcategory_table = subcategory_table.withColumn(
            column, 
            F.regexp_replace(F.col(column), '[^\\x20-\\x7E]', '')
        )

subcategory_table = subcategory_table.withColumn("description", F.trim(F.lower(F.col("description")))) \
                                     .withColumn("name", F.trim(F.lower(F.col("name")))) 
# Filter out rows containing 'invalid' in any column
for column in subcategory_table.columns:
    subcategory_table = subcategory_table.filter(~F.lower(F.col(column).cast("string")).contains("invalid"))

# Filter future-dated timestamps: cap to current timestamp if future
timestamp_columns = ['create_time', 'updated_at', 'delete_time']
for column in timestamp_columns:
    if column in subcategory_table.columns:
        subcategory_table = subcategory_table.withColumn(
            column,
            to_timestamp(F.col(column))
        ).withColumn(
            column,
            when(F.col(column) > current_timestamp(), current_timestamp()).otherwise(F.col(column))
        )

# Remove all rows with any null values (AFTER type casting to handle coercion nulls)
subcategory_table = subcategory_table.dropna()

# Remove duplicates based on id, keeping any one row
subcategory_table = subcategory_table.dropDuplicates(["id"])

# COMMAND ----------
# --- 5. LOAD TO SILVER LAYER: Write Cleaned Data as Delta Lake ---

# Use Delta format for reliability, transactions, and schema enforcement.

writer = subcategory_table.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true")

writer.save(SILVER_FULL_PATH)

print(f"Processed {subcategory_table.count()} cleaned records")
print("Next Steps: Gold layer processing will handle aggregations and joins.")