In [1]:
!pip install pyspark



In [None]:
from confluent_kafka import Consumer, KafkaError
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

# Kafka Consumer Configuration
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'quotes-consumer',
    'auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_conf)
topic = 'test-topic'

# Initialize PySpark Session
spark = SparkSession.builder \
    .appName("QuotesConsumer") \
    .getOrCreate()

# List to temporarily hold consumed data
data_list = []

# Function to process each Kafka message
def process_message(message):
    try:
        data = json.loads(message)
        data_list.append((data['id'], data['quote'], data['author'], len(data['quote'])))
    except (json.JSONDecodeError, KeyError):
        print("Error processing message or missing fields")

# Function to consume messages from Kafka and calculate statistics
def consume_messages():
    print("Listening for messages...")
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached: {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
                else:
                    print(f"Error: {msg.error()}")
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")
                process_message(msg.value().decode('utf-8'))

    except KeyboardInterrupt:
        print("\nConsumer interrupted. Shutting down...")

        if data_list:
            # Load data into a PySpark DataFrame
            df = spark.createDataFrame(data_list, ["id", "quote", "author", "quote_length"])
            df.show(truncate=False)

            # Descriptive Statistics for Numeric Fields
            print("\nDescriptive Statistics (Numeric):")
            df.describe("id", "quote_length").show()

            # Count of Unique Authors
            unique_authors = df.select("author").distinct().count()
            print(f"\nNumber of unique authors: {unique_authors}")

            # Aggregate Metrics (Average and Standard Deviation of Quote Length)
            print("\nAggregate Metrics:")
            df.agg(
                F.avg("quote_length").alias("avg_quote_length"),
                F.stddev("quote_length").alias("stddev_quote_length")
            ).show()
        else:
            print("No data to analyze.")

    finally:
        consumer.close()
        print("Consumer closed.")

consume_messages()

Listening for messages...
Received message: Hello Kafka!
Error processing message or missing fields
Received message: 
Error processing message or missing fields
Received message: Hellow
Error processing message or missing fields
Received message: addoow
Error processing message or missing fields
Received message: yoooooooo
Error processing message or missing fields
Received message: Test Message 0
Error processing message or missing fields
Received message: Test Message 1
Error processing message or missing fields
Received message: Test Message 2
Error processing message or missing fields
Received message: Test Message 3
Error processing message or missing fields
Received message: Test Message 4
Error processing message or missing fields
Received message: Test Message 0
Error processing message or missing fields
Received message: Test Message 1
Error processing message or missing fields
Received message: Test Message 2
Error processing message or missing fields
Received message: Test 