# Bronze Layer Data Ingestion
This notebook handles raw data ingestion for Contoso Corp

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
from datetime import datetime
import logging

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Contoso Bronze Ingestion") \
    .getOrCreate()

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# Configuration
BRONZE_LAKEHOUSE = "ctso-bronze-lakehouse"
SOURCE_PATH = "abfss://raw@contosostorage.dfs.core.windows.net/"
BRONZE_PATH = f"Tables/{BRONZE_LAKEHOUSE}/"

# Data sources to ingest
data_sources = [
    {"name": "customers", "format": "csv", "path": "customers/"},
    {"name": "orders", "format": "parquet", "path": "orders/"},
    {"name": "products", "format": "json", "path": "products/"}
]

In [None]:
# Ingestion function
def ingest_to_bronze(source_info):
    """Ingest raw data to bronze layer with metadata."""
    try:
        logger.info(f"Starting ingestion for {source_info['name']}")
        
        # Read source data
        df = spark.read \
            .format(source_info['format']) \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load(SOURCE_PATH + source_info['path'])
        
        # Add metadata columns
        df = df.withColumn("_ingestion_timestamp", current_timestamp()) \
               .withColumn("_source_file", input_file_name()) \
               .withColumn("_record_source", lit(source_info['name']))
        
        # Write to bronze layer
        output_path = BRONZE_PATH + source_info['name']
        df.write \
          .mode("append") \
          .format("delta") \
          .save(output_path)
        
        logger.info(f"✓ Successfully ingested {df.count()} records for {source_info['name']}")
        return True
        
    except Exception as e:
        logger.error(f"✗ Failed to ingest {source_info['name']}: {str(e)}")
        return False

In [None]:
# Run ingestion for all data sources
results = []
for source in data_sources:
    success = ingest_to_bronze(source)
    results.append({"source": source['name'], "success": success})

# Summary
successful = sum(1 for r in results if r['success'])
print(f"\nIngestion Complete: {successful}/{len(data_sources)} sources processed successfully")