In [1]:
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
import json
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_csv
import os

## Create Kafka Topic 

In [2]:
print("Creating Kafka topic...")

time.sleep(2)

admin_client = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],
    client_id='notebook_admin'
)

topic_name = 'test'
topic = NewTopic(
    name=topic_name, 
    num_partitions=1, 
    replication_factor=1
)

try:
    admin_client.create_topics(new_topics=[topic], validate_only=False)
    print(f"Topic '{topic_name}' created")
except TopicAlreadyExistsError:
    print(f"Topic '{topic_name}' already exists")
except Exception as e:
    print(f"Error: {e}")
finally:
    admin_client.close()

Creating Kafka topic...
Topic 'test' already exists


## Create Kafka Producer

In [3]:
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: v.encode('utf-8')
)
rows = [
    "196,242,3.0,881250949",
    "186,302,3.0,891717742",
    "22,377,1.0,878887116",
    "244,51,2.0,880606923",
    "166,346,1.0,886397596",
]

for line in rows:
    producer.send('test', value=line)
    
producer.flush()
producer.close()
print(f"Sent {len(rows)} messages")

Sent 5 messages


## Cassandra and Spark Setup

In [4]:
print("\nInitializing Spark with Cassandra connector...")
spark = (
    SparkSession.builder
    .appName("kafka-cassandra-streaming")
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0,"
        "org.apache.spark:spark-token-provider-kafka-0-10_2.13:4.0.0,"
        "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1"
    )
    .config("spark.cassandra.connection.host", "localhost")
    .config("spark.cassandra.connection.port", "9042")
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")
    .getOrCreate()
)

print("Spark session created!")
spark 


Initializing Spark with Cassandra connector...


:: loading settings :: url = jar:file:/Users/pranavrajan/Desktop/id2221-data-intensive-traffic/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/pranavrajan/.ivy2.5.2/cache
The jars for the packages stored in: /Users/pranavrajan/.ivy2.5.2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.apache.spark#spark-token-provider-kafka-0-10_2.13 added as a dependency
com.datastax.spark#spark-cassandra-connector_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bfacfefd-bb62-463d-80f4-66e74d37d34b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;4.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;4.0.0 in central
	found org.apache.kafka#kafka-clients;3.9.0 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.7 in central
	found org.slf4j#slf4j-api;2.

Spark session created!


In [5]:
print("\nReading stream from Kafka...")


Reading stream from Kafka...


## Create Schema for Kafka Stream

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, LongType
from pyspark.sql.functions import col, from_csv

# Schema for the CSV in Kafka 'value'
schema_ddl = "UserId INT, MovieId INT, Rating DOUBLE, Timestamp LONG"

# Read from Kafka
raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load())

In [7]:
print("\nParsing Kafka messages...")


Parsing Kafka messages...


In [8]:
parsed = (raw
    .selectExpr("CAST(value AS STRING) AS value_str", "timestamp AS kafka_timestamp")
    .select(
        from_csv(col("value_str"), schema_ddl).alias("r"), 
        col("kafka_timestamp")
    )
    .select("r.*", "kafka_timestamp")
    .where(col("UserId").isNotNull()))

In [9]:
print(f"\nWrite stream to Cassandra!")


Write stream to Cassandra!


In [10]:
def write_to_cassandra(batch_df, epoch_id):
    if batch_df.isEmpty():
        print(f"Batch {epoch_id}: No data")
        return

    print(f"\n--- Batch {epoch_id} ---")
    batch_df.show()

    batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .option("keyspace", "movie_ratings") \
        .option("table", "ratings") \
        .save()

    count = batch_df.count()
    print(f"Wrote {count} rows to cassandra!")

In [20]:
print(f"\nStreaming query!!!")


Streaming query!!!


In [12]:
query = (parsed.writeStream
    .foreachBatch(write_to_cassandra)
    .outputMode("append")
    .start())

25/09/29 14:15:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/73/y8z3_15n0c3fnwk5fkhj6ql40000gn/T/temporary-1bd9a033-231d-4146-ab78-9f0355d332c1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/09/29 14:15:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.



--- Batch 0 ---
+------+-------+------+---------+--------------------+
|UserId|MovieId|Rating|Timestamp|     kafka_timestamp|
+------+-------+------+---------+--------------------+
|   196|    242|   3.0|881250949|2025-09-29 13:12:...|
|   186|    302|   3.0|891717742|2025-09-29 13:12:...|
|    22|    377|   1.0|878887116|2025-09-29 13:12:...|
|   244|     51|   2.0|880606923|2025-09-29 13:12:...|
|   166|    346|   1.0|886397596|2025-09-29 13:12:...|
|   196|    242|   3.0|881250949|2025-09-29 14:14:...|
|   186|    302|   3.0|891717742|2025-09-29 14:14:...|
|    22|    377|   1.0|878887116|2025-09-29 14:14:...|
|   244|     51|   2.0|880606923|2025-09-29 14:14:...|
|   166|    346|   1.0|886397596|2025-09-29 14:14:...|
|   196|    242|   3.0|881250949|2025-09-29 14:15:...|
|   186|    302|   3.0|891717742|2025-09-29 14:15:...|
|    22|    377|   1.0|878887116|2025-09-29 14:15:...|
|   244|     51|   2.0|880606923|2025-09-29 14:15:...|
|   166|    346|   1.0|886397596|2025-09-29 14:1

In [21]:
# In[13]:
print("\nStream for 15 seconds")

try:
    query.awaitTermination(15)
    print("\nStream completed successfully")
except Exception as e:
    print(f"\nStream failed: {e}")
finally:
    if query.isActive:
        query.stop()
    print("\nStream stopped!")


Stream for 15 seconds

Stream completed successfully

Stream stopped!


In [14]:
print(f"\nQuery Cassandra!!")


Query Cassandra!!


In [15]:
cassandra_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "movie_ratings") \
    .option("table", "ratings") \
    .load()

In [16]:
total_count = cassandra_df.count()
print(f"{total_count} records\n")

5 records



In [17]:
cassandra_df.show(truncate=False)

+------+-------+---------+------+-----------------------+
|UserId|MovieId|Timestamp|Rating|kafka_timestamp        |
+------+-------+---------+------+-----------------------+
|244   |51     |880606923|2.0   |2025-09-29 14:15:31.165|
|22    |377    |878887116|1.0   |2025-09-29 14:15:31.165|
|166   |346    |886397596|1.0   |2025-09-29 14:15:31.165|
|186   |302    |891717742|3.0   |2025-09-29 14:15:31.165|
|196   |242    |881250949|3.0   |2025-09-29 14:15:31.165|
+------+-------+---------+------+-----------------------+



In [18]:
cassandra_df.printSchema()

root
 |-- UserId: integer (nullable = false)
 |-- MovieId: integer (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Rating: double (nullable = true)
 |-- kafka_timestamp: timestamp (nullable = true)



In [19]:
# Read from Cassandra
cassandra_df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "movie_ratings") \
    .option("table", "ratings") \
    .load()

cassandra_df.show(truncate=False)

+------+-------+---------+------+-----------------------+
|UserId|MovieId|Timestamp|Rating|kafka_timestamp        |
+------+-------+---------+------+-----------------------+
|166   |346    |886397596|1.0   |2025-09-29 14:15:31.165|
|186   |302    |891717742|3.0   |2025-09-29 14:15:31.165|
|244   |51     |880606923|2.0   |2025-09-29 14:15:31.165|
|22    |377    |878887116|1.0   |2025-09-29 14:15:31.165|
|196   |242    |881250949|3.0   |2025-09-29 14:15:31.165|
+------+-------+---------+------+-----------------------+

