In [0]:
%run ./01_autoloader_config

In [0]:
def ingest_with_autoloader(src_path, tgt_path):
    """
    Auto Loader ingestion with schema safety and corruption handling.
    """
    try:
        # Detect format first
        fmt = detect_file_format(src_path)
        log(f"Starting ingestion from {src_path}")
        log(f"Detected format: {fmt}")

        if fmt is None:
            log(f"⚠️ Unsupported format for {src_path}, skipping.")
            return

        # Define options common for all formats
        options = {
            "cloudFiles.format": fmt,
            "cloudFiles.inferColumnTypes": "true",
            "cloudFiles.schemaEvolutionMode": "rescue",
            "cloudFiles.maxFilesPerTrigger": "1"
        }

        # Try reading one sample to test schema validity before streaming
        try:
            sample_df = spark.read.format(fmt).load(src_path)
            # Normalize column names (remove special chars, spaces, tabs, etc.)
            for c in sample_df.columns:
                new_col = re.sub(r'[^a-zA-Z0-9_]', '_', c.strip())
                sample_df = sample_df.withColumnRenamed(c, new_col)
            log(f"✅ Schema validated for {src_path}")
        except Exception as e:
            log(f"❌ Skipping {src_path} due to schema error: {e}")
            return  # Skip corrupted folder/file safely

        # Proceed with Auto Loader
        (
            spark.readStream.format("cloudFiles")
            .options(**options)
            .load(src_path)
            .writeStream
            .format("delta")
            .option("checkpointLocation", tgt_path + "_checkpoint")
            .outputMode("append")
            .start(tgt_path)
        )

        log(f"✅ Auto Loader started for {src_path} -> {tgt_path}")

    except Exception as e:
        log(f"❌ Failed ingestion for {src_path}: {e}")
