In [None]:
import json
import logging
from confluent_kafka import Consumer, KafkaException, KafkaError

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger("ConfluentKafkaConsumer")

consumer_config = {
    'bootstrap.servers': 'kafka-1:9092,kafka-2:9093,kafka-3:9094', 
    'group.id': 'flight-consumer-group',
    'auto.offset.reset': 'earliest', 
    'enable.auto.commit': False,  
    'session.timeout.ms': 10000,  
    'max.poll.records': 500, 
}

consumer = Consumer(consumer_config)

topic = "flight-producer"

def process_message(message):
    try:
        flight_data = json.loads(message.value().decode('utf-8'))
        for flight in flight_data.get("flights", []):
            dep = flight["departure_city"]
            arr = flight["arrival_city"]
            logger.info(f"Received flight from {dep} to {arr}")
    except Exception as e:
        logger.error(f"Error processing message: {e}")

def main():
    logger.info("Starting Kafka consumer...")

    # Subscribe to the Kafka topic
    consumer.subscribe([topic])

    try:
        while True:
            # Poll messages from Kafka
            msg = consumer.poll(timeout=1.0)

            if msg is None:
                # No message available within the timeout
                continue
            if msg.error():
                # Handle Kafka errors
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    logger.info(f"End of partition reached: {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
                else:
                    raise KafkaException(msg.error())
            else:
                # Process the message
                logger.info(f"Received message: {msg.value().decode('utf-8')}")
                process_message(msg)

    except KeyboardInterrupt:
        logger.info("Consumer interrupted.")

    finally:
        # Close the consumer
        consumer.close()

if __name__ == "__main__":
    main()
