In [2]:
from pathlib import Path
from confluent_kafka import Consumer, KafkaError, TopicPartition
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer
import time
import random
import os
import sys

# Load environment variables from .env file
from dotenv import load_dotenv
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

# Access environment variables
project_name = os.getenv('COMPOSE_PROJECT_NAME')
kafka_host = os.getenv('KAFKA_HOST')
topic_name = os.getenv('KAFKA_TOPIC_NAME')
replication_factor = int(os.getenv('KAFKA_REPLICATION'))
num_partitions = int(os.getenv('KAFKA_PARTITION'))
schema_registry_host = os.getenv('SCHEMA_REG_HOST')

# Protobuf schema import (assuming it's in a 'protobuf' folder)
sys.path.append('./protobuf')
import protobuf_schema_pb2 

# Kafka Configuration
bootstrap_servers = f'{kafka_host}:9092' 
schema_registry_url = f'http://{schema_registry_host}:8081' 
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})

# Consumer Configuration
protobuf_deserializer = ProtobufDeserializer(protobuf_schema_pb2.EmployeeSalary, 
                                             {'schema.registry.url': schema_registry_url,
                                              'use.deprecated.format': False})
consumer_conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'salary-analytics-group', 
    'auto.offset.reset': 'earliest'
}

# Kafka Consumer and Analytics
consumer = Consumer(consumer_conf)

# Subscribe with on_assign callback
consumer.subscribe([topic_name])

total_salaries = 0
num_employees = 0

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    salary_data = protobuf_deserializer(msg.value(), None)

    total_salaries += salary_data.salary
    num_employees += 1

    print(f"Employee ID: {salary_data.employee_id}")
    print(f"Employee Name: {salary_data.employee_name}")
    print(f"Salary: ${salary_data.salary}")
    print(f"Department: {salary_data.department}")
    print("----------------------")

    # Simple Analytics 
    if num_employees > 0:
        average_salary = total_salaries / num_employees
        print(f"Average Salary: ${average_salary:.2f}")

Employee ID: e79a317c-5219-43d2-9540-6ebc7e28665e
Employee Name: Evan Mclean
Salary: $61761.65
Department: Marketing
----------------------
Average Salary: $61761.65
Employee ID: 4275d112-f5b8-402c-88a9-73ba6ca92e1c
Employee Name: Tammy Johnson
Salary: $76072.17
Department: IT
----------------------
Average Salary: $68916.91
Employee ID: 33622fab-3081-4c63-8600-32ef0cfbd0f1
Employee Name: Taylor Obrien
Salary: $54893.16
Department: Finance
----------------------
Average Salary: $64242.33
Employee ID: 8e2a7ffa-e5de-4a15-a74f-0650e93e851b
Employee Name: Carmen Hernandez
Salary: $74757.58
Department: HR
----------------------
Average Salary: $66871.14
Employee ID: 93813f08-a953-49b6-a3bb-4767fed5f78d
Employee Name: Trevor Arnold
Salary: $52694.72
Department: IT
----------------------
Average Salary: $64035.86
Employee ID: cf568cfc-c3e5-49e1-96b2-502e72835b20
Employee Name: Matthew Chavez
Salary: $71798.78
Department: IT
----------------------
Average Salary: $65329.68
Employee ID: ab72cb2

KeyboardInterrupt: 