# Delta Streaming Pipeline: Refactored and Py4J-Safe

In [None]:

import logging
from datetime import datetime
from typing import List
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("DeltaIngest")


In [None]:

class StreamConfig:
    """Configuration for stream ingestion."""

    def __init__(self):
        self.catalog = "hive_metastore"
        self.schema = "bronze"
        self.table_name = "entity_table"
        self.source_path = "s3a://your-bucket/entity-data/"
        self.checkpoint_path = "s3a://your-bucket/checkpoints/entity"
        self.merge_keys = ["entity_id"]


In [None]:

def setup_spark() -> SparkSession:
    """Initializes and returns a SparkSession configured for Delta Lake."""
    try:
        spark = SparkSession.builder \
            .appName("DeltaLakeIngest") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .getOrCreate()
        return spark
    except Exception as e:
        logger.error(f"Failed to initialize SparkSession: {str(e)}")
        raise


In [None]:

class DataValidation:
    """Handles validation logic for input DataFrames."""

    def __init__(self, merge_keys: List[str]):
        self.merge_keys = merge_keys

    def validate(self, df: DataFrame) -> bool:
        """Validates DataFrame to ensure merge keys are not null or empty."""
        try:
            for key in self.merge_keys:
                null_count = df.filter(df[key].isNull() | (df[key] == '')).count()
                if null_count > 0:
                    logger.warning(f"Validation failed: Primary key '{key}' has {null_count} null/empty values")
                    return False
            return True
        except Exception as e:
            logger.error(f"Data validation error: {str(e)}")
            return False


In [None]:

def merge_to_delta(spark: SparkSession, df: DataFrame, config: StreamConfig):
    """Merges the input DataFrame into the target Delta table."""
    try:
        full_table_name = f"{config.catalog}.{config.schema}.{config.table_name}"

        if not DeltaTable.isDeltaTable(spark, f"{config.catalog}.{config.schema}.{config.table_name}"):
            logger.info(f"Creating new Delta table: {full_table_name}")
            df.write.format("delta").mode("overwrite").saveAsTable(full_table_name)
            return

        delta_table = DeltaTable.forName(spark, full_table_name)
        merge_condition = " AND ".join([f"target.{k} = source.{k}" for k in config.merge_keys])

        delta_table.alias("target").merge(
            source=df.alias("source"),
            condition=merge_condition
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

        logger.info(f"Merged data into Delta table: {full_table_name}")

    except Exception as e:
        logger.error(f"Error during Delta merge: {str(e)}")
        raise


In [None]:

def start_stream(spark: SparkSession, config: StreamConfig):
    """Starts the Auto Loader stream for ingesting data."""
    try:
        logger.info("Starting Auto Loader stream...")

        schema = StructType([
            StructField("entity_id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("event_time", StringType(), True)
        ])

        raw_df = spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format", "csv") \
            .option("header", "true") \
            .schema(schema) \
            .load(config.source_path)

        df_with_ts = raw_df.withColumn("ingested_at", current_timestamp())

        def foreach_batch_function(batch_df, batch_id):
            logger.info(f"Processing batch {batch_id} ...")
            validator = DataValidation(config.merge_keys)
            if validator.validate(batch_df):
                merge_to_delta(spark, batch_df, config)
            else:
                logger.warning(f"Skipping batch {batch_id} due to validation failure")

        query = df_with_ts.writeStream \
            .format("delta") \
            .option("checkpointLocation", config.checkpoint_path) \
            .foreachBatch(foreach_batch_function) \
            .start()

        query.awaitTermination()

    except Exception as e:
        logger.error(f"Error in stream processing: {str(e)}")
        raise


In [None]:

if __name__ == "__main__":
    config = StreamConfig()
    spark = setup_spark()
    start_stream(spark, config)
