In [0]:
%run ../utils/common_utils

In [0]:
# Run this to see the true workspace path
print(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())

In [0]:
# 1. Load the centralized configuration
config = get_pipeline_config()

# 2. Authenticate to ADLS (Direct Access)
authenticate_adls(config['storage_account'], config['secret_scope'])

In [0]:
# Create a widget to allow ADF to specify a specific file, 
# or leave it as "ALL" to process the whole folder.
dbutils.widgets.text("input_file_name", "ALL")
file_param = dbutils.widgets.get("input_file_name")

input_container = config['container_input']
storage_account = config['storage_account']

# Base path for the input container
base_input_path = f"abfss://{input_container}@{storage_account}.dfs.core.windows.net/"

# Logic to determine which files to process
if file_param.upper() == "ALL":
    # List all files in the input folder
    files_to_process = [f.name for f in dbutils.fs.ls(base_input_path) if f.name.endswith('.csv')]
else:
    files_to_process = [file_param]

print(f"Files identified for processing: {files_to_process}")

In [0]:
from pyspark.sql.functions import current_timestamp, input_file_name
import re

for file in files_to_process:
    try:
        print(f"--- Starting processing for: {file} ---")
        
        # 1. Build Paths
        current_input_path = f"{base_input_path}{file}"
        table_name = file.replace(".csv", "").replace(" ", "_").lower()
        silver_path = get_path(config['container_output'], storage_account, f"silver/{table_name}")
        
        # 2. Read with SEMICOLON delimiter
        df_raw = spark.read.format("csv") \
            .option("header", "true") \
            .option("sep", ";") \
            .option("inferSchema", "true") \
            .load(current_input_path)
        
        # 3. Clean Column Names (Best Practice)
        # Removes quotes, spaces, and semicolons from column headers
        for col in df_raw.columns:
            clean_col = re.sub(r'[ ,;{}()\n\t=]', '_', col).replace('"', '')
            df_raw = df_raw.withColumnRenamed(col, clean_col)
        
        # 4. Add Audit Metadata
        df_silver = df_raw.withColumn("ingestion_timestamp", current_timestamp()) \
                          .withColumn("source_filename", input_file_name())
        
        # 5. Write as Delta
        df_silver.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .save(silver_path)
            
        print(f"✅ Successfully processed {file} into Silver table: {table_name}")
        
    except Exception as e:
        print(f"❌ Failed to process {file}. Error: {str(e)}")

print("--- All files processed ---")