In [1]:
from pyspark.sql import SparkSession
from delta import *

import subprocess
import json
import os

In [2]:
from uuid import uuid4

In [3]:
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka import avro
import pyspark.sql.functions as pyspark_f
import pyspark.sql.types as pyspark_dt

In [4]:
session_id = str(uuid4())

In [5]:
spark = SparkSession.builder.appName(session_id) \
        .master("spark://spark-master:7077")\
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0,org.apache.spark:spark-avro_2.12:3.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")\
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
        .getOrCreate()
sqlContext = SparkSession(spark)
# Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-189c56e0-1d0a-49c2-863c-d96a9dce8e4f;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
	found org.apache.spark#spark-avro_2.12;3.4.0 in central
	found org.tukaani#xz;1.9 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.sl

In [7]:
# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]

# Define schema
schema = pyspark_dt.StructType([
    pyspark_dt.StructField("Name", pyspark_dt.StringType(), True),
    pyspark_dt.StructField("Age", pyspark_dt.IntegerType(), True)
])

# Create DataFrame
sample_df = spark.createDataFrame(data, schema)

# Show DataFrame content
sample_df.toPandas()

                                                                                

Unnamed: 0,Name,Age
0,Alice,25
1,Bob,30
2,Charlie,35


In [6]:
# builder = SparkSession.builder.appName(session_id) \
#     .master("spark://spark-master:7077")\
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0")\
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# spark = configure_spark_with_delta_pip(builder).getOrCreate()    
# # spark.sparkContext.setLogLevel("WARN")

# Create Topic & Schema Registry

In [8]:
topic_name = 'sample_topic'
kafka_config = {
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
}

schema_registry_cofig =  {'url' : 'http://schema-registry:8085'} # URL for Schema Registry
schema_registry_subject = 'schema_registry_subject_example' 

In [9]:
# Create Kafka topic
admin_client = AdminClient(kafka_config)
new_topic = NewTopic(topic_name, num_partitions=2, replication_factor=2)

# Create the topic
admin_client.create_topics([new_topic])
print(f"Kafka topic {topic_name} created.")

Kafka topic sample_topic created.


In [10]:
producer = Producer(kafka_config)

In [None]:
# Produce messages
for i in range(10):
    key = str(i)
    value = {"name": f"User_{i}", "age": 20 + i}
    value_json = json.dumps(value)  # Convert dictionary to JSON string
    producer.produce(topic=topic_name, key=key, value=value_json)
    print(f"Produced Avro message: {value}")
producer.flush()

Produced Avro message: {'name': 'User_0', 'age': 20}
Produced Avro message: {'name': 'User_1', 'age': 21}
Produced Avro message: {'name': 'User_2', 'age': 22}
Produced Avro message: {'name': 'User_3', 'age': 23}
Produced Avro message: {'name': 'User_4', 'age': 24}
Produced Avro message: {'name': 'User_5', 'age': 25}
Produced Avro message: {'name': 'User_6', 'age': 26}
Produced Avro message: {'name': 'User_7', 'age': 27}
Produced Avro message: {'name': 'User_8', 'age': 28}
Produced Avro message: {'name': 'User_9', 'age': 29}


0

In [None]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config['bootstrap.servers']) \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "earliest") \
    .load()

# Start the streaming query and display results
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# query.awaitTermination()

24/11/14 08:27:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-572f037d-0ec4-4dc5-8fcc-3f87f47a750c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/11/14 08:27:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/11/14 08:27:20 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+------------+---------+------+--------------------+-------------+
| key|               value|       topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------------+---------+------+--------------------+-------------+
|[34]|[7B 22 6E 61 6D 6...|sample_topic|        0|     0|2024-11-14 08:25:...|            0|
|[35]|[7B 22 6E 61 6D 6...|sample_topic|        0|     1|2024-11-14 08:25:...|            0|
|[36]|[7B 22 6E 61 6D 6...|sample_topic|        0|     2|2024-11-14 08:25:...|            0|
|[37]|[7B 22 6E 61 6D 6...|sample_topic|        0|     3|2024-11-14 08:25:...|            0|
|[30]|[7B 22 6E 61 6D 6...|sample_topic|        1|     0|2024-11-14 08:25:...|            0|
|[31]|[7B 22 6E 61 6D 6...|sample_topic|        1|     1|2024-11-14 08:25:...|            0|
|[32]|[7B 22 6E 61 6D 6...|sample_topic|        1|     2|2024-11-1