# Spark Declarative Pipeline: from_json Schema Inference & Evolution

This notebook demonstrates Databricks Spark Declarative Pipelines with `from_json` automatic schema inference and evolution.

## Features Demonstrated:
1. **Automatic Schema Inference** - `from_json` with `schemaLocationKey`
2. **Schema Evolution** - Handling new fields automatically
3. **Multiple Evolution Modes** - `addNewColumns`, `rescue`, `none`
4. **Schema Hints** - Guiding type inference
5. **Bronze → Silver → Gold** pattern with evolving schemas

## References:
- [from_json Schema Inference Documentation](https://docs.databricks.com/aws/en/ldp/from-json-schema-evolution)

In [None]:
from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *

# =============================================================================
# CONFIGURATION PARAMETERS (Read from SDP pipeline configuration)
# =============================================================================
# These values are passed from the Databricks Asset Bundle configuration
# and can be accessed via spark.conf.get() with defaults for local testing
CATALOG = spark.conf.get("catalog", "pavan_naidu")
SCHEMA = spark.conf.get("schema", "json")
VOLUME = spark.conf.get("volume", "raw_data")
SOURCE_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/users_stream/"
# =============================================================================

print("✅ Spark Declarative Pipeline Configuration")
print(f"📂 Source: {SOURCE_PATH}")
print(f"🗄️  Target: {CATALOG}.{SCHEMA}")
print(f"📦 Volume: {VOLUME}")

## Bronze Layer: Auto Schema Inference with from_json

Reads JSON lines with automatic schema inference and evolution:
- **cloudFiles format**: `text` (reads raw JSON as text)
- **from_json schema**: `None` (automatic inference)
- **schemaLocationKey**: Unique key to track inferred schema
- **schemaEvolutionMode**: `addNewColumns` (automatically adds new fields as they appear)

Three schema evolution strategies demonstrated:
1. **addNewColumns** - New fields added to schema automatically
2. **Schema Hints** - Guide type inference (e.g., `"age INT, created_at TIMESTAMP"`)
3. **Rescue Mode** - New fields stored in separate `rescued_fields` column

In [None]:
@dp.table(
    comment="Bronze layer: Raw user data with automatic schema inference using from_json",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def users_bronze():
    """
    Read JSON lines from cloud storage and parse with automatic schema inference using from_json.
    The schema is inferred from the first batch and automatically evolves as new fields appear.
    Data is kept as a struct column for schema evolution support.
    """
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "text")  # Read as text to get 'value' column
            .load(SOURCE_PATH)
            .select(
                col("_metadata.file_path").alias("source_file"),
                col("_metadata.file_modification_time").alias("ingestion_time"),
                from_json(
                    col("value"), 
                    None,  # NULL schema = automatic inference
                    {
                        "schemaLocationKey": "users_bronze_json",  # Unique key for this schema
                        "schemaEvolutionMode": "addNewColumns"     # Add new fields automatically
                    }
                ).alias("data"),
                from_json(
                    col("value"), 
                    None,  # NULL schema = automatic inference
                    {
                        "schemaLocationKey": "users_bronze_schema_hints",  # Unique key for this schema
                        "schemaEvolutionMode": "addNewColumns",     # Add new fields automatically
                        # Schema hints: guide inference for specific fields
                        "schemaHints": "age INT, created_at TIMESTAMP"
                    }
                ).alias("data_schema_hints"),
                from_json(
                    col("value"), 
                    None,  # NULL schema = automatic inference
                    {
                        "schemaLocationKey": "users_bronze_rescue", # Unique key for this schema
                        "schemaEvolutionMode": "rescue",  # New fields → _rescued_data
                        "rescuedDataColumn": "rescued_fields"  # Custom name for rescued data
                    }
                ).alias("data_rescued")
            )
    )

## Silver Layer: Extract and Validate

Extract commonly-used fields from the inferred JSON structure for optimized queries.

In [None]:
@dp.table(
    comment="Silver layer: Extract and flatten key fields with data quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
@dp.expect_or_drop("valid_age", "age IS NOT NULL AND age BETWEEN 0 AND 120")
@dp.expect_or_drop("valid_email", "email IS NOT NULL AND email LIKE '%@%'")
def users_silver():
    """
    Flatten and extract commonly-accessed fields from the bronze layer data struct.
    Only accesses fields available in Phase 1 data.
    """
    return (
        spark.readStream.table("users_bronze")
            .select(
                # Top-level fields from data struct (Phase 1)
                col("data.user_id").alias("user_id"),
                col("data.username").alias("username"),
                col("data.email").alias("email"),
                col("data.name").alias("name"),
                col("data.age").cast("int").alias("age"),
                col("data.phone").alias("phone"),
                
                # Nested address fields (Phase 1)
                col("data.address.street").alias("street"),
                col("data.address.city").alias("city"),
                col("data.address.state").alias("state"),
                col("data.address.country").alias("country"),
                col("data.address.postal_code").alias("postal_code"),
                col("data.address.coordinates.latitude").cast("double").alias("latitude"),
                col("data.address.coordinates.longitude").cast("double").alias("longitude"),
                
                # Nested profile fields (Phase 1)
                col("data.profile.bio").alias("bio"),
                col("data.profile.occupation").alias("occupation"),
                col("data.profile.company").alias("company"),
                
                # Optional referral fields (Phase 1)
                col("data.referral_code").alias("referral_code"),
                col("data.referred_by").alias("referred_by"),
                
                # Timestamps (Phase 1)
                col("data.created_at").cast("timestamp").alias("created_at"),
                col("data.last_login").cast("timestamp").alias("last_login"),
                
                # Metadata
                col("source_file"),
                col("ingestion_time"),
                current_timestamp().alias("processed_time")
            )
    )

## Gold Layer: Aggregations and Analytics

Create business-ready aggregated tables for analytics.

In [None]:
@dp.table(
    comment="Gold layer: User statistics by occupation",
    table_properties={
        "quality": "gold"
    }
)
def user_stats_by_occupation():
    """
    Aggregate user statistics by occupation.
    Uses only Phase 1 fields available in the initial data.
    """
    return (
        spark.read.table("users_silver")
            .groupBy("occupation")
            .agg(
                count("*").alias("user_count"),
                avg("age").alias("avg_age"),
                countDistinct("country").alias("unique_countries"),
                countDistinct("company").alias("unique_companies"),
                min("created_at").alias("first_user_created"),
                max("created_at").alias("last_user_created")
            )
            .orderBy(desc("user_count"))
    )

In [None]:
@dp.table(
    comment="Gold layer: Geographic user distribution"
)
def user_geographic_summary():
    """
    Aggregate users by geographic location.
    """
    return (
        dp.read("users_silver")
            .groupBy("country", "state")
            .agg(
                count("*").alias("user_count"),
                avg("age").alias("avg_age"),
                countDistinct("city").alias("unique_cities")
            )
            .orderBy(desc("user_count"))
    )