# Framework Ingestion Notebook Example

This notebook demonstrates the framework contract for ingestion tasks.
It reads from a source Delta table, transforms the data, and writes to a target Delta table.

**Framework Contract:**
- Accepts `source_id` and `control_table` as inputs via widgets
- Reads catalog, schema, and table names from the control table metadata using source_id
- In production, the framework will pass source_id and control_table via widgets
- For this example, widgets have default values so it can run end-to-end without manual input


In [None]:
import logging
import json
from pyspark.sql import functions as F

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Framework widgets - In production, these are set by the framework
# For this example, widgets have default values so it can run end-to-end
dbutils.widgets.text("source_id", "delta_table_ingestion", "Source ID")
dbutils.widgets.text("control_table", "main.examples.etl_control", "Control Table")

# Get widget values
source_id = dbutils.widgets.get("source_id")
control_table = dbutils.widgets.get("control_table")

if not source_id:
    raise ValueError("source_id widget is required")
if not control_table:
    raise ValueError("control_table widget is required")

logger.info(f"Reading metadata for source_id: {source_id} from control table: {control_table}")

# Read metadata from control table
try:
    metadata_df = spark.table(control_table).filter(
        (F.col("source_id") == source_id) & (F.col("is_active") == True)
    ).select("source_config", "target_config").first()
    
    if not metadata_df:
        raise ValueError(f"No active metadata found for source_id '{source_id}' in control table '{control_table}'")
    
    # Parse JSON configs
    source_config_str = metadata_df['source_config']
    target_config_str = metadata_df['target_config']
    
    source_config = json.loads(source_config_str) if isinstance(source_config_str, str) else source_config_str
    target_config = json.loads(target_config_str) if isinstance(target_config_str, str) else target_config_str
    
    # Extract catalog, schema, and table names
    catalog = source_config.get('catalog') or target_config.get('catalog')
    schema = source_config.get('schema') or target_config.get('schema')
    source_table = source_config.get('table')
    target_table = target_config.get('table')
    write_mode = target_config.get('write_mode', 'overwrite')
    
    # Validate required fields
    if not catalog:
        raise ValueError(f"Missing 'catalog' in source_config or target_config for source_id '{source_id}'")
    if not schema:
        raise ValueError(f"Missing 'schema' in source_config or target_config for source_id '{source_id}'")
    if not source_table:
        raise ValueError(f"Missing 'table' in source_config for source_id '{source_id}'")
    if not target_table:
        raise ValueError(f"Missing 'table' in target_config for source_id '{source_id}'")
    
    logger.info(f"Successfully read metadata from control table")
    logger.info(f"Catalog: {catalog}, Schema: {schema}")
    logger.info(f"Source table: {source_table}")
    logger.info(f"Target table: {target_table}")
    logger.info(f"Write mode: {write_mode}")
    
except Exception as e:
    logger.error(f"Failed to read metadata from control table: {str(e)}")
    raise


## Parse and Validate Configurations


In [None]:
# Configuration already validated when reading from control table
logger.info("Configuration validated successfully")
logger.info(f"Source: {catalog}.{schema}.{source_table}")
logger.info(f"Target: {catalog}.{schema}.{target_table}")


## Prepare Source Data

If the source table doesn't exist, create sample data for demonstration.
In production, this would read directly from the configured source table.


In [None]:
from pyspark.sql.types import StructType, StructField, StringType

source_table_full = f"{catalog}.{schema}.{source_table}"

logger.info(f"Reading from source table: {source_table_full}")

# Try to read from source table, if it doesn't exist, create sample data
df = None
record_count = 0

try:
    df = spark.table(source_table_full)
    record_count = df.count()
    logger.info(f"Successfully read {record_count} records from source table")
    df.show(5, truncate=False)
except Exception as e:
    logger.warning(f"Source table not found: {source_table_full}. Creating sample data for demonstration.")
    
    # Create sample source data
    sample_data = [
        ("CUST001", "John", "Doe", "john.doe@example.com", "2024-01-15", "active"),
        ("CUST002", "Jane", "Smith", "jane.smith@example.com", "2024-01-16", "active"),
        ("CUST003", "Bob", "Johnson", "bob.johnson@example.com", "2024-01-17", "inactive"),
        ("CUST004", "Alice", "Williams", "alice.williams@example.com", "2024-01-18", "active"),
        ("CUST005", "Charlie", "Brown", "charlie.brown@example.com", "2024-01-19", "active")
    ]
    
    schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("registration_date", StringType(), True),
        StructField("status", StringType(), True)
    ])
    
    df = spark.createDataFrame(sample_data, schema)
    
    # Create the source table for demonstration purposes
    df.write.format("delta").mode("overwrite").saveAsTable(source_table_full)
    record_count = df.count()
    logger.info(f"Created sample source table with {record_count} records")
    df.show(5, truncate=False)


## Transform Data


In [None]:
# Transform data: add metadata columns and apply business logic
df_transformed = df.withColumn("ingestion_timestamp", F.current_timestamp()) \
                   .withColumn("source_id", F.lit(source_id)) \
                   .withColumn("full_name", F.concat(F.col("first_name"), F.lit(" "), F.col("last_name"))) \
                   .filter(F.col("status") == "active")  # Example: filter only active customers

record_count_transformed = df_transformed.count()
logger.info(f"Transformed data: {record_count_transformed} records (filtered from {record_count} source records)")
logger.info("Sample transformed data:")
df_transformed.select("customer_id", "full_name", "email", "status", "source_id", "ingestion_timestamp").show(5, truncate=False)


## Write to Target

Write transformed data to the target Delta table.


In [None]:
target_table_full = f"{catalog}.{schema}.{target_table}"

logger.info(f"Writing to target table: {target_table_full}")
logger.info(f"Write mode: {write_mode}")
logger.info(f"Records to write: {record_count_transformed}")

# Write to target Delta table
try:
    df_transformed.write \
        .format("delta") \
        .mode(write_mode) \
        .option("mergeSchema", "true") \
        .saveAsTable(target_table_full)
    
    logger.info(f"✅ Successfully wrote {record_count_transformed} records to {target_table_full}")
    
    # Verify the write
    written_df = spark.table(target_table_full)
    written_count = written_df.count()
    logger.info(f"✅ Verified: {written_count} records in target table")
    written_df.select("customer_id", "full_name", "email", "status", "source_id", "ingestion_timestamp").show(5, truncate=False)
    
except Exception as e:
    logger.error(f"Failed to write to target table: {str(e)}")
    raise


## Summary

✅ Metadata read from control table using source_id  
✅ Configuration parsed and validated  
✅ Source data read from Delta table (or created for demo)  
✅ Data transformed and enriched  
✅ Data written to target Delta table  

**Framework Contract:**  
This notebook demonstrates the expected contract:
- Accepts `source_id` and `control_table` as inputs via widgets
- Reads catalog, schema, and table names from control table metadata
- Validates configuration
- Reads from source Delta table
- Transforms data (adds metadata columns, applies business logic)
- Writes to target Delta table

**Widgets Used:**
- `source_id`: Unique identifier for this ingestion task (used to query control table)
- `control_table`: Name of the control table containing job metadata

**Metadata Structure:**
The notebook reads from the control table:
- `source_config`: JSON string with `catalog`, `schema`, `table` (source table location)
- `target_config`: JSON string with `catalog`, `schema`, `table`, `write_mode` (target table location and write mode)

**Configuration:**
- Write mode comes from `target_config.write_mode` (defaults to `overwrite` if not specified)
- Both source and target use the same catalog and schema (from source_config/target_config)

The framework will set `source_id` and `control_table` widgets when calling this notebook as a task.
