In [1]:
from confluent_kafka import Consumer, KafkaError, TopicPartition
import json
import pandas as pd
from IPython.display import display, clear_output
import time

In [None]:
consumer_config = {
    'bootstrap.servers': 'localhost:9092',  # Connect to Kafka running in Docker
    'group.id': 'jupyter-consumer',         # Consumer group ID
    'auto.offset.reset': 'earliest',        # Start reading from beginning of topic
    'enable.auto.commit': True,             # Automatically commit offsets
    'security.protocol': 'PLAINTEXT',
    'client.id': 'jupyter-client'
}

In [3]:
def test_kafka_connection():
    from confluent_kafka.admin import AdminClient

    admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})
    try:
        topics = admin_client.list_topics(timeout=10)
        print("Successfully connected to Kafka broker.")
        print("Available topics:")
        for topic in topics.topics:
            print(f" - {topic}")
        return True
    except Exception as e:
        print(f"Failed to connect to Kafka: {e}")
        return False

In [None]:
def consume_batch(topic_name='darooghe.transactions', max_messages=100, timeout=1.0):
    consumer = Consumer(consumer_config)
    consumer.subscribe([topic_name])

    messages = []
    count = 0
    start_time = time.time()

    try:
        while count < max_messages:
            msg = consumer.poll(timeout)

            if msg is None:
                if time.time() - start_time > 10:
                    print("No more messages received in the last 10 seconds.")
                    break
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f"Error: {msg.error()}")
                    break

            transaction = json.loads(msg.value().decode('utf-8'))
            messages.append(transaction)
            count += 1
            start_time = time.time()

            if count % 10 == 0:
                print(f"Received {count} messages")

    except Exception as e:
        print(f"Error consuming messages: {e}")
    finally:
        consumer.close()

    return messages

In [None]:
def consume_continuous(topic_name='darooghe.transactions', timeout=1.0, max_display=10):
    consumer = Consumer(consumer_config)
    consumer.subscribe([topic_name])

    recent_txns = []

    try:
        print("Starting continuous consumption... (Press Ctrl+C to stop)")
        while True:
            msg = consumer.poll(timeout)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f"Error: {msg.error()}")
                    break

            transaction = json.loads(msg.value().decode('utf-8'))

            recent_txns.append(transaction)
            if len(recent_txns) > max_display:
                recent_txns = recent_txns[-max_display:]

            clear_output(wait=True)
            print(f"Consuming messages from {topic_name}... (Press Ctrl+C to stop)")
            display(pd.DataFrame(recent_txns))

    except KeyboardInterrupt:
        print("\nConsumption stopped by user")
    finally:
        consumer.close()

In [None]:
def analyze_transactions(transactions):
    if not transactions:
        print("No transactions to analyze")
        return

    df = pd.DataFrame(transactions)

    print(f"\nAnalyzed {len(transactions)} transactions:")
    print(f"Average amount: {df['amount'].mean():.2f}")
    print(f"Total value: {df['amount'].sum():.2f}")

    print("\nTransactions by status:")
    display(df['status'].value_counts())

    print("\nTransactions by merchant category:")
    display(df['merchant_category'].value_counts())

    print("\nTransactions by payment method:")
    display(df['payment_method'].value_counts())

    print("\nSample transactions:")
    display(df.head())

    return df

In [7]:
test_kafka_connection()

Successfully connected to Kafka broker.
Available topics:
 - __consumer_offsets


True

In [8]:
# Consume 50 messages from the topic
transactions = consume_batch(max_messages=50)

# Analyze the consumed transactions
df = analyze_transactions(transactions)

%3|1743299607.712|FAIL|jupyter-client#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: kafka:9092: Failed to resolve 'kafka:9092': Temporary failure in name resolution (after 10007ms in state CONNECT)


No more messages received in the last 10 seconds.
No transactions to analyze


%3|1743299617.723|FAIL|jupyter-client#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: Failed to resolve 'kafka:9092': Temporary failure in name resolution (after 10010ms in state CONNECT)
