In [None]:
from pyspark.sql.functions import col, to_timestamp, current_timestamp
from delta.tables import DeltaTable
import json
from datetime import datetime

# --- CONFIGURATION ---
BRONZE_PATH = "Files/Bronze/Landing"
SILVER_TABLE_NAME = "silver_news"

try:
    # 1. READ RAW DATA
    df_raw = spark.read.json(BRONZE_PATH)
    
    # 2. CLEANING
    df_clean = df_raw.withColumn("date", to_timestamp(col("date"))) \
                     .withColumn("processed_time", current_timestamp()) \
                     .select(
                         col("title"),
                         col("url"),
                         col("snippet"),  # âœ… FIXED: Use "snippet" not "body"
                         col("source"),
                         col("date"),
                         col("competitor_tag"),
                         col("processed_time")
                     )
    
    # Drop null URLs
    df_clean = df_clean.filter(col("url").isNotNull())
    
    # 3. DEDUPLICATION
    if spark.catalog.tableExists(SILVER_TABLE_NAME):
        # Incremental Merge
        delta_table = DeltaTable.forName(spark, SILVER_TABLE_NAME)
        
        delta_table.alias("target").merge(
            df_clean.alias("source"),
            "target.url = source.url"
        ).whenNotMatchedInsertAll().execute()
        
        message = "Merge Complete - Incremental Load"
    else:
        # First time creation
        df_clean.write.format("delta").saveAsTable(SILVER_TABLE_NAME)
        message = "Table Created - Initial Load"
    
    # Prepare pipeline output
    record_count = df_clean.count()
    result = {
        "status": "success",
        "message": message,
        "records_processed": record_count,
        "table_name": SILVER_TABLE_NAME,
        "timestamp": datetime.now().isoformat()
    }
    
    # Exit with JSON for pipeline
    mssparkutils.notebook.exit(json.dumps(result))

except Exception as e:
    # Handle errors gracefully
    error_result = {
        "status": "failed",
        "error": str(e),
        "timestamp": datetime.now().isoformat()
    }
    mssparkutils.notebook.exit(json.dumps(error_result))

StatementMeta(, cfc06aad-a3f6-4fae-9237-ec36ca71eefe, 28, Finished, Available, Finished)

Reading raw files...
Table 'silver_news' exists. Performing Incremental Merge...
Merge Complete.
