In [1]:
! pip install confluent-kafka avro-python3 requests protobuf



In [4]:
import os
import struct
import requests
import tempfile
import importlib.util
from confluent_kafka import Consumer
from google.protobuf.message import DecodeError
from google.protobuf.internal.decoder import _DecodeVarint32

# Configuration
KAFKA_BROKER = "redpanda:9092"
SCHEMA_REGISTRY_URL = "http://redpanda:8081"
TOPIC_NAME = "search_requests_with_schema"
GROUP_ID = "search-consumer-4"

# Kafka Consumer Config
consumer = Consumer({
    "bootstrap.servers": KAFKA_BROKER,
    "group.id": GROUP_ID,
    "auto.offset.reset": "earliest",
})

consumer.subscribe([TOPIC_NAME])


def fetch_schema_by_id(schema_id):
    """Fetches the Protobuf schema from the Schema Registry by schema ID."""
    schema_url = f"{SCHEMA_REGISTRY_URL}/schemas/ids/{schema_id}"
    
    response = requests.get(schema_url)
    if response.status_code == 200:
        return response.json()["schema"]
    else:
        raise Exception(f"Failed to fetch schema ID {schema_id}: {response.text}")


def compile_proto_from_schema(proto_schema):
    """Compiles the Protobuf schema dynamically and loads the message class."""
    with tempfile.TemporaryDirectory() as temp_dir:
        proto_file_path = os.path.join(temp_dir, "dynamic.proto")
        
        # Write the schema to a .proto file
        with open(proto_file_path, "w") as f:
            f.write(proto_schema)

        # Compile the .proto file
        os.system(f"protoc --python_out={temp_dir} --proto_path={temp_dir} {proto_file_path}")

        # Import the compiled Python module dynamically
        generated_file = os.path.join(temp_dir, "dynamic_pb2.py")
        spec = importlib.util.spec_from_file_location("dynamic_pb2", generated_file)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

        # Return the dynamically loaded Protobuf class (assuming first message is the one we need)
        message_class_name = list(module.DESCRIPTOR.message_types_by_name.keys())[0]
        return getattr(module, message_class_name)


# Start consuming messages
print("\n🚀 Waiting for messages...\n")
schema_cache = {}

while True:
    msg = consumer.poll(1.0)  # Poll for messages

    if msg is None:
        continue
    if msg.error():
        print(f"❌ Consumer error: {msg.error()}")
        continue


    # Ensure message is long enough
    if len(msg.value()) < 5:
        print(f"⚠️ Received message too short: {msg.value()}")
        continue

    if len(msg.value()) < 5:
    print(f"⚠️ Received message too short: {msg.value()}")
    continue

    # Extract Schema ID (skip first byte for magic byte)
    schema_id = struct.unpack(">I", msg.value()[1:5])[0]  # Read bytes 1-4 (Schema ID)
    
    # Decode the message indexes (varint array)
    message_index_start = 5
    message_indexes = []
    offset = message_index_start
    
    while offset < len(msg.value()):
        index, index_size = _DecodeVarint32(msg.value(), offset)
        message_indexes.append(index)
        offset += index_size
        if index_size == 0:  # Stop if decoding fails (prevents infinite loops)
            break
    
    # Extract the Protobuf payload after the last message index
    payload = msg.value()[offset:]
    
    print(f"📌 Extracted Schema ID: {schema_id}, Message Indexes: {message_indexes}")

    # Fetch schema if not cached
    if schema_id not in schema_cache:
        try:
            proto_schema = fetch_schema_by_id(schema_id)
            schema_cache[schema_id] = compile_proto_from_schema(proto_schema)
            print(f"🔄 Loaded schema ID {schema_id}")
        except Exception as e:
            print(f"⚠️ Failed to load schema ID {schema_id}: {e}")
            continue

    # Parse the Protobuf message
    DynamicMessageClass = schema_cache[schema_id]
    dynamic_message = DynamicMessageClass()

    # Deserialize Protobuf message (assuming first message type is the one used)
    if message_indexes:
        try:
            dynamic_message = DynamicMessageClass()
            dynamic_message.ParseFromString(payload)
            print(f"🔹 Received: {dynamic_message}")
        except DecodeError:
            print("❌ Failed to decode Protobuf message.")
    else:
        print(f"⚠️ No valid message indexes found.")



IndentationError: expected an indented block after 'if' statement on line 79 (2719455921.py, line 80)

%4|1740161126.379|MAXPOLL|rdkafka#consumer-2| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 271ms (adjust max.poll.interval.ms for long-running message processing): leaving group
