In [1]:
from pyspark.sql import SparkSession
import pyspark
from delta import *
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.functions import *


scala_version = '2.12'  # to be change if needed : the value is based on your installation  
spark_version = '3.5.0' # to be change if needed : the value is based on your installation
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0', 'io.delta:delta-spark_2.12:3.1.0'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
   .getOrCreate()
spark



In [2]:
installed_packages = spark.conf.get("spark.jars.packages")
print("Installed packages:", installed_packages)

Installed packages: org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,org.apache.kafka:kafka-clients:3.2.0,io.delta:delta-spark_2.12:3.1.0


In [3]:
# TODO: Replace with real list of data
data = ['Hello', 'World']
df = spark.createDataFrame([{'value': v} for v in data])

In [4]:
topic_name = 'dbserver1.cdc.demo'
df.write.format("kafka")\
  .option("kafka.bootstrap.servers", "kafka:9092")\
  .option("topic", topic_name)\
  .save()

In [None]:
# Define Kafka source
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.cdc.demo") \
    .option("startingOffsets", "earliest") \
    .load()

# Decode the key and value columns from bytes to strings
df = df.withColumn("key", col("key").cast("string"))
df = df.withColumn("value", col("value").cast("string"))

# Start processing the stream and write to both console and Parquet files

query = (df.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", "./data/_checkpoints/")
   .start("./data/events")
)
query.awaitTermination()

In [None]:
delta_table_path = "./data/events"

df = spark.readStream \
  .format("delta") \
  .load(delta_table_path)

# Process the DataFrame (df) here if needed with transformations, aggregations etc.

# Start the streaming query and optionally write to a console sink
query = df.writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

# Wait for termination (optional, can be interrupted using Ctrl+C)
query.awaitTermination()
