In [0]:
import re
from pyspark.sql.functions import current_timestamp, col

# --- CONFIGURATION ---
CATALOG = "project_fraud_detection"
SCHEMA = "bronze_layer"
TABLE_NAME = "bronze_data"
SOURCE_FOLDER = f"/Volumes/{CATALOG}/source_data/raw/"

def setup_database():
    spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.source_data")
    spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.source_data.checkpoints")
    print("Structure validated.")

def clean_column_names(df):
    """
    Reusable function to fix invalid column names in a DataFrame.
    Example: 'Unnamed: 0' -> 'Unnamed__0', 'First Name' -> 'First_Name'
    """
    clean_columns = []
    for column in df.columns:
        # Replace any character that is NOT alphanumeric (a-z, 0-9) with an underscore
        clean_name = re.sub(r'[^a-zA-Z0-9]', '_', column)
        clean_columns.append(clean_name)
    
    # .toDF() assigns the new list of names to the DataFrame
    return df.toDF(*clean_columns)

def create_and_load_bronze_table(source_path, table_name):
    print(f"Reading data from: {source_path}")
    
    base_checkpoint_path = f"/Volumes/{CATALOG}/source_data/checkpoints/{table_name}"
    
    # 1. Read CSV
    df_raw = (spark.readStream
              .format("cloudFiles")
              .option("cloudFiles.format", "csv")
              .option("header", "true") 
              .option("inferSchema", "true")
              .option("cloudFiles.schemaLocation", f"{base_checkpoint_path}/schema") 
              .load(source_path)) 

    # 2. CLEANUP STEP (Fixes the Error)
    # Apply the cleaning function before writing
    df_clean = clean_column_names(df_raw)

    # 3. Add Metadata
    df_bronze = df_clean.select(
        "*", 
        col("_metadata.file_path").alias("source_file")
    ).withColumn("ingestion_time", current_timestamp())

    # 4. Write to Delta Table
    query = (df_bronze.writeStream
             .format("delta")
             .outputMode("append")
             .option("checkpointLocation", f"{base_checkpoint_path}/checkpoint") 
             .option("mergeSchema", "true")
             .trigger(availableNow=True)
             .table(f"{CATALOG}.{SCHEMA}.{table_name}"))
    
    query.awaitTermination()
    print(f"Success! Data saved to: {CATALOG}.{SCHEMA}.{table_name}")

# --- EXECUTION ---
setup_database()
create_and_load_bronze_table(SOURCE_FOLDER, TABLE_NAME)

Structure validated.
Reading data from: /Volumes/project_fraud_detection/source_data/raw/
Success! Data saved to: project_fraud_detection.bronze_layer.bronze_data
