In [None]:
# stream_json_to_kafka.py

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StringType, IntegerType, BooleanType

# Step 1: Define environment variables (if JAVA_HOME is required)
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ.get("PATH", "")

# Step 2: Initialize SparkSession with Kafka support
spark = SparkSession.builder \
    .appName("Stream JSON to Kafka") \
    .config("spark.jars.packages", ",".join([
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
        "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.4.1"
    ])) \
    .getOrCreate()

print("Spark Version:", spark.version)

# Step 3: Define input schema matching your JSON data
schema = StructType() \
    .add("id", IntegerType()) \
    .add("name", StringType()) \
    .add("score", IntegerType()) \
    .add("passed", BooleanType())

# Step 4: Read streaming data from directory
source_path = "data/"
df = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(source_path)

# Step 5: Transform and convert to JSON
transformed_df = df.withColumn("ingestion_time", current_timestamp())
json_df = transformed_df.selectExpr("to_json(struct(*)) AS value")

# Step 6: Write to Kafka
topic = "first-kafka-topic"
try:
    query = json_df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("topic", topic) \
        .option("checkpointLocation", "/tmp/kafka-checkpoint") \
        .outputMode("append") \
        .start()

    query.awaitTermination()
except Exception as e:
    print("Error:", e)
    print("Ensure Kafka is running at localhost:9092 and topic exists.")
    spark.stop()


Using existing JAVA_HOME: /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home
Detected Spark version for environment: 4.0.0
Attempting to use Kafka SQL connector version: 4.0.0 (for Scala 2.13) with Spark 4.0.0
REMOVING spark-token-provider-kafka package for this attempt.
Attempting to initialize SparkSession with packages: org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,org.lz4:lz4-java:1.8.0,org.xerial.snappy:snappy-java:1.1.10.5


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/02 13:49:42 WARN Utils: Your hostname, Avis-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.56 instead (on interface en0)
25/06/02 13:49:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/avikaushik/code/s3_sample_project/env/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/avikaushik/.ivy2.5.2/cache
The jars for the packages stored in: /Users/avikaushik/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.lz4#lz4-java added as a dependency
org.xerial.snappy#snappy-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ec2d7575-93db-4002-9863-a99cc2c5f101;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org

Loaded Spark version (runtime): 4.0.0
Using Scala version for Kafka connector package: 2.13
Attempting to use Kafka SQL package version: org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0
Token Provider package: NOT INCLUDED in this attempt.
Other packages included: org.lz4:lz4-java:1.8.0,org.xerial.snappy:snappy-java:1.1.10.5
Using source directory: /Users/avikaushik/code/s3_sample_project/data
Attempting to write to Kafka topic 'first-kafka-topic' on servers 'localhost:9092'
Using query checkpoint location: '/tmp/kafka-checkpoint-local-fixed-v4'
Streaming query 'Unnamed Streaming Query' (id: 0b85ed99-fa16-4a54-92d5-a581fe6852f5) started successfully.
Waiting for termination... (Interrupt kernel to stop)


25/06/02 13:50:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
