In [1]:
import time
import threading
from confluent_kafka import Consumer, KafkaException
from collections import Counter

# Initialize global variables
word_count = Counter()
total_chars = 0
processed_messages = False  # Flag to check if all messages are processed

# Kafka consumer configuration
conf = {
    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker address
    'group.id': 'file-streaming-consumer-group',  # Consumer group ID
    'auto.offset.reset': 'earliest',  # Start reading from the earliest message
}

consumer = Consumer(conf)

# Function to process each message received from Kafka
def process_message(message):
    global total_chars

    # Log receipt of the message
    print("Received message...")

    # Check if message value is None
    if message.value() is None:
        print("Received a message with no value. Skipping...")
        return

    line = message.value().decode('utf-8')
    print(f"Decoded line: {line.strip()}")  # Log the decoded message

    # Handle cases where message.key() is None
    cursor_pos = message.key().decode('utf-8') if message.key() is not None else "No Cursor Position"
    print(f"Cursor position: {cursor_pos}")  # Log the cursor position

    # Update total character count
    total_chars += len(line)
    print(f"Updated total character count: {total_chars}")

    # Track word occurrences
    words = line.split()  # Split line into words
    print(f"Words in the line: {words}")

    for word in words:
        word = word.lower()  # Normalize to lowercase
        word_count[word] += 1

    # Log the current word count state
    print(f"Current word count (last 5 words): {dict(list(word_count.items())[-5:])}")

# Function to track and display word frequency in real-time
def track_word_frequency():
    while True:
        if word_count:
            sorted_words = word_count.most_common(5)  # Get the top 5 most frequent words
            print("\nTop 5 most frequent words:")
            for word, count in sorted_words:
                print(f"{word}: {count}")
        time.sleep(5)  # Update the statistics every 5 seconds

# Consumer loop to continuously poll Kafka for messages
def consume_messages():
    global processed_messages
    try:
        consumer.subscribe(['file-streaming'])  # Subscribe to the 'file-streaming' topic
        print("Subscribed to topic 'file-streaming'...")

        while True:
            msg = consumer.poll(timeout=1.0)  # Wait for messages (1 second timeout)

            if msg is None:  # No message received
                if processed_messages:  # If messages have been processed, stop consuming
                    print("All messages processed, closing consumer...")
                    break
                print("No message received, continuing to poll...")
                continue
            elif msg.error():  # Handle any Kafka error
                print(f"Error received: {msg.error()}")
                raise KafkaException(msg.error())
            else:
                print("Processing message...")
                process_message(msg)

    except KeyboardInterrupt:
        print("Consumer stopped by user.")

    finally:
        processed_messages = True
        consumer.close()  # Close the consumer when done

# Start the Kafka consumer and track word frequency in the background
def start_consumer():
    print("Starting the consumer...")

    # Start word frequency tracking in a separate thread
    thread = threading.Thread(target=track_word_frequency)
    thread.daemon = True  # Allow thread to exit when the program terminates
    thread.start()

    # Start consuming Kafka messages
    consumer_thread = threading.Thread(target=consume_messages)
    consumer_thread.daemon = True  # Allow thread to exit when the program terminates
    consumer_thread.start()

# Start the consumer
if __name__ == "__main__":
    print("Starting Kafka consumer...")
    start_consumer()

    # Run a loop to keep the main thread alive while the consumer is running
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Consumer stopped.")
    finally:
        if not processed_messages:
            consumer.close()  # Properly close the consumer when done


Starting Kafka consumer...
Starting the consumer...
Subscribed to topic 'file-streaming'...
No message received, continuing to poll...
No message received, continuing to poll...
No message received, continuing to poll...
Processing message...
Received message...
Decoded line: From fairest creatures we desire increase,
Cursor position: 45
Updated total character count: 45
Words in the line: ['From', 'fairest', 'creatures', 'we', 'desire', 'increase,']
Current word count (last 5 words): {'fairest': 1, 'creatures': 1, 'we': 1, 'desire': 1, 'increase,': 1}
Processing message...
Received message...
Decoded line: That thereby beauty's rose might never die,
Cursor position: 91
Updated total character count: 91
Words in the line: ['That', 'thereby', "beauty's", 'rose', 'might', 'never', 'die,']
Current word count (last 5 words): {"beauty's": 1, 'rose': 1, 'might': 1, 'never': 1, 'die,': 1}
Processing message...
Received message...
Decoded line: But as the riper should by time decease,
Cursor p