In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro
import requests
import json

# Function to get schema from Schema Registry
def get_avro_schema_from_registry(schema_registry_url, subject_name):
    url = f"{schema_registry_url}/subjects/{subject_name}/versions/latest"
    response = requests.get(url)
    response.raise_for_status()
    return json.loads(response.json()['schema'])

# Configuration
SCHEMA_REGISTRY_URL = "http://schema-registry:8081"
SUBJECT_NAME = "user-topic-value"

# Get schema from registry
schema_json = get_avro_schema_from_registry(SCHEMA_REGISTRY_URL, SUBJECT_NAME)
print(f"Retrieved schema: {schema_json}")

Retrieved schema: {'type': 'record', 'name': 'User', 'fields': [{'name': 'id', 'type': 'int'}, {'name': 'name', 'type': 'string'}, {'name': 'email', 'type': 'string'}, {'name': 'age', 'type': 'int'}]}


In [2]:
# Create Spark session with Avro support
spark = SparkSession.builder \
    .appName("KafkaAvroToPostgreSQL") \
    .master("spark://spark-master:7077") \
    .config("spark.jars.packages", ",".join([
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
        "org.apache.spark:spark-avro_2.12:3.5.0"
    ])) \
    .getOrCreate()

print("Spark session created successfully!")
print(f"Spark UI available at: http://localhost:4040")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-87dee02c-2e65-460b-a411-905beb2aced9;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found org.apache.spa

Spark session created successfully!
Spark UI available at: http://localhost:4040


In [3]:
# Read from Kafka
df_kafka = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "user-topic") \
    .option("startingOffsets", "latest") \
    .load()

# Remove Confluent wire format header (first 5 bytes) and deserialize Avro
df_avro = df_kafka.withColumn(
    "avro_payload", 
    expr("substring(value, 6, length(value)-5)")
).select(
    col("key").cast("string").alias("message_key"),
    from_avro("avro_payload", json.dumps(schema_json)).alias("user_data"),
    col("topic").alias("topic"),
    col("partition").alias("partition_id"),
    col("offset").alias("offset_value"),
    col("timestamp").alias("timestamp_value")
).select(
    "message_key",
    "user_data.*",  # Expand the user_data struct
    "topic",
    "partition_id", 
    "offset_value",
    "timestamp_value"
)

print("Kafka stream configured with Avro deserialization")

Kafka stream configured with Avro deserialization


In [4]:
# Function to write each batch to PostgreSQL
def write_to_postgres(batch_df, batch_id):
    if batch_df.count() > 0:
        batch_df.write \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://postgres:5432/sparkdb") \
            .option("dbtable", "kafka_messages") \
            .option("user", "sparkuser") \
            .option("password", "sparkpass") \
            .option("driver", "org.postgresql.Driver") \
            .mode("append") \
            .save()
        print(f"Batch {batch_id}: Wrote {batch_df.count()} Avro records to PostgreSQL")

# Start the streaming query
query = df_avro.writeStream \
    .foreachBatch(write_to_postgres) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kafka-avro-postgres-checkpoint") \
    .start()

print("Streaming started! Send Avro messages to see them processed...")
print("To stop: run query.stop() in the next cell")

25/09/02 16:20:29 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Streaming started! Send Avro messages to see them processed...
To stop: run query.stop() in the next cell


25/09/02 16:20:32 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [None]:
# To stop the stream when done
# query.stop()