In [1]:
from pyspark.sql import SparkSession
import logging

spark = SparkSession.builder \
    .appName("KafkaFullPipeline") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.postgresql:postgresql:42.7.8") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .getOrCreate()

df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "streaming-topic") \
    .option("startingOffsets", "earliest") \
    .load()

df_stream = df_stream.selectExpr(
    "CAST(key AS STRING) as message_key", 
    "CAST(value AS STRING) as message_value",
    "timestamp"
)

def write_batch_to_postgres(batch_df, batch_id):
    """Write batch to PostgreSQL with error handling"""
    try:
        if batch_df.count() > 0:
            batch_df.write \
                .mode("append") \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://postgres:5432/kafka_sink") \
                .option("dbtable", "stream_table") \
                .option("user", "postgres") \
                .option("password", "postgres") \
                .option("driver", "org.postgresql.Driver") \
                .option("batchsize", "1000") \
                .save()
            print(f"✅ Batch {batch_id}: {batch_df.count()} records written successfully")
        else:
            print(f"⚠️ Batch {batch_id}: No records to write")
    except Exception as e:
        print(f"❌ Error writing batch {batch_id}: {str(e)}")

query = df_stream.writeStream \
    .foreachBatch(write_batch_to_postgres) \
    .outputMode("append") \
    .trigger(processingTime='5 seconds') \
    .start()

query.awaitTermination()