In [9]:
from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.serialization import Serializer, Deserializer
from confluent_kafka.serialization import StringDeserializer
import random, time
import collections

In [2]:
bootstrap_servers = 'kafka-1:19092,kafka-2:19093,kafka-3:19094'
input_topic = 'wordcount-input'
output_topic = 'wordcount-output'

sample_strings = ["Hello, world!", "Kafka is awesome!", "Sample message", "Confluent Kafka"]

# Kafka producer configuration
producer_conf = {
    'bootstrap.servers': bootstrap_servers,
}

# Create Kafka producer instance
producer = Producer(producer_conf)

# Example of producing a message with custom serialization
key = str(int(time.time() * 1000))  # Using timestamp as a key
value = random.choice(sample_strings)

producer.produce(
    topic=input_topic,
    key=key.encode('utf-8'),  # Assuming a UTF-8 encoding for the key,
    value=value.encode('utf-8')  # Assuming a UTF-8 encoding for the value
)

In [22]:
# Add all sample strings !
for value in sample_strings:
    producer.produce(
        topic=input_topic,
        key=MySerializer.serialize(key),
        value=MySerializer.serialize(value)
    )

In [3]:
# Don't forget to flush the producer to ensure the message is sent
producer.flush()

0

In [10]:
# Kafka consumer configuration
consumer_conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'wordcount-group',
    'auto.offset.reset': 'earliest',
}

# Create Kafka consumer instance
consumer = Consumer(consumer_conf)

# Subscribe to the input topic
consumer.subscribe([input_topic])

# Dictionary to store word counts
word_counts = collections.defaultdict(int)

In [6]:
def produce_word_count_records(line):
    words = line.split()
    word_count = {}

    for word in words:
        word_count[word] = word_count.get(word, 0) + 1

    return word_count

def produce_output_records(key, value):
    records = []
    for word, count in value.items():
        records.append((word, count))
    return records

In [7]:
try:
    while True:
        msg = consumer.poll(1.0)  # Adjust the timeout as needed

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event, not an error
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                continue

        key = msg.key()  # Binary key
        value = msg.value()  # Binary value

        # Decode key and value as needed
        decoded_value = value.decode('utf-8') if value else None

        # Your processing logic here
        print(f"Received message: Value={decoded_value}")

        # Process each line and produce word count records
        word_count_records = produce_word_count_records(decoded_value)

        # Update word counts
        for word, count in word_count_records.items():
            word_counts[word] += count

        # Produce output records
        output_records = produce_output_records(key, word_count_records)

        # Print word counts
        print("Word Counts:", dict(word_counts))

except KeyboardInterrupt:
    pass
finally:
    # Close the consumer
    consumer.close()

Received message: Value=Kafka is awesome!
Word Counts: {'Kafka': 1, 'is': 1, 'awesome!': 1}
