In [1]:
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from kafka import KafkaConsumer
from collections import defaultdict

In [2]:
# Load environment variables
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
# Kafka configuration
kafka_host = os.getenv("KAFKA_HOST") + ":9092"
kafka_topic = os.getenv("KAFKA_TOPIC_NAME")
kafka_topic_partition = os.getenv("KAFKA_TOPIC_NAME") + "-1"

In [4]:
# Kafka Consumer Group
consumer_group = "consumer-group-1"

In [5]:
# Rolling Aggregation State
aggregation_state = defaultdict(lambda: {"count": 0, "sum": 0, "avg": 0})

In [6]:
# Kafka Consumer
consumer = KafkaConsumer(
    kafka_topic,  # Consumes messages from the main topic
    group_id=consumer_group,
    bootstrap_servers=kafka_host,
    enable_auto_commit=True,
    auto_offset_reset="earliest",  # Start from the earliest offset if no committed offset exists
    value_deserializer=lambda msg: json.loads(msg.decode("utf-8")),  # Deserialize JSON messages
)

In [7]:
# Function to update rolling aggregation
def update_aggregation(key, value):
    """Update rolling aggregation for a specific key."""
    if key not in aggregation_state:
        aggregation_state[key] = {"count": 0, "sum": 0, "avg": 0}

    # Assuming `value` contains a numerical field named `annual_income`
    amount = value.get("annual_income", 0)  # Replace with the field you want to aggregate
    aggregation_state[key]["count"] += 1
    aggregation_state[key]["sum"] += amount
    aggregation_state[key]["avg"] = aggregation_state[key]["sum"] / aggregation_state[key]["count"]

# Start consuming Kafka messages
print(f"Starting Kafka Consumer for topic: {kafka_topic}")

Starting Kafka Consumer for topic: test-topic


In [8]:
try:
    for message in consumer:
        # Deserialize the message
        value = message.value

        # Example key for aggregation (can be based on message data)
        key = value.get("gender")  # Aggregation key (e.g., gender)

        # Update rolling aggregation
        update_aggregation(key, value)

        # Print the current rolling aggregation state
        print(f"Rolling Gender Annual Income Aggregation: {json.dumps(aggregation_state, indent=2)}")

except KeyboardInterrupt:
    print("Consumer interrupted by user. Exiting...")
finally:
    consumer.close()
    print("Consumer closed.")

Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 1,
    "sum": 71824,
    "avg": 71824.0
  }
}
Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 2,
    "sum": 222759,
    "avg": 111379.5
  }
}
Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 2,
    "sum": 222759,
    "avg": 111379.5
  },
  "Male": {
    "count": 1,
    "sum": 180995,
    "avg": 180995.0
  }
}
Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 2,
    "sum": 222759,
    "avg": 111379.5
  },
  "Male": {
    "count": 2,
    "sum": 263903,
    "avg": 131951.5
  }
}
Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 3,
    "sum": 377350,
    "avg": 125783.33333333333
  },
  "Male": {
    "count": 2,
    "sum": 263903,
    "avg": 131951.5
  }
}
Rolling Gender Annual Income Aggregation: {
  "Female": {
    "count": 3,
    "sum": 377350,
    "avg": 125783.33333333333
  },
  "Male": {
    "count": 3,
    "sum": 407951,
    "a