### Tutorial Excercise: Local Confluent Kafka Producer and Consumer 
This notebook is part of the tutorial ["How to handle diverse data in event streaming for carbon accounting"](https://simon.richebaecher.org) in Simon Richebaechers blog series on cloud services. Please follow the instructions provided in the mentioned post, as the following activities are part of step 2 in the underlying use case. If you are interested in more information on how to set up a python client application for Confluent, you can visit [the Confluent developer space](https://developer.confluent.io/courses/kafka-python/intro/).  

In [4]:
# Import necessary streaming modules from the confluent_kafka library
from confluent_kafka import Producer, Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

# Import configuration parameters from the local 'config' and 'sr_config' files
from config import config, sr_config

# Import the 'random', 'datetime', 'pytz', and 'time' modules for assistive use
import random as random
import datetime as dt
import pytz
import time

#### Building a Producer which is Avro schema compliant 
Our initial goal is to produce events to a topic we set up in Confluent Cloud. The schema registry that comes with our envrionment in Confluent stores schemas and provides such for serialization and deserialization requests. We have opted for an Avro schema format and will therefore create an Avro compliant producer for our simulated tracking data.  

First we write classes to define the structure of the tracking events and their associated measurements. The TrackingEvent class has attributes like id, customer_id, date_time, and measurement_list, while the Measurement class has attributes like vehicle_id, product_id, journey_km, and total_km. Both classes provide a structured way to store and manage tracking event data.

In [5]:
# Define a class named TrackingEvent to represent tracking events
class TrackingEvent(object):
    def __init__(self, id, customer_id, date_time, measurement_list):
        # Constructor for TrackingEvent with attributes:
        # - id: Identifier for the tracking event
        # - customer_id: Identifier for the associated customer
        # - date_time: Date and time when the tracking event occurred
        # - measurement_list: List containing measurement objects related to the event
        self.id = id
        self.customer_id = customer_id
        self.date_time = date_time
        self.measurement_list = measurement_list

# Define a class named Measurement to represent measurements within tracking events
class Measurement(object):
    def __init__(self, vehicle_id, product_id, journey_km, total_km):
        # Constructor for Measurement with attributes:
        # - vehicle_id: Identifier for the vehicle associated with the measurement
        # - product_id: Identifier for the product, i.e. vehicle type associated with the measurement
        # - journey_km: Kilometers covered in the current journey
        # - total_km: Total kilometers covered by the vehicle
        self.vehicle_id = vehicle_id
        self.product_id = product_id
        self.journey_km = journey_km
        self.total_km = total_km


To ensure compliance for the data structures we created above, the Avro schema below details such for its upcoming registration in the schema registry and application during event production. 

In [6]:
schema_str = """{
  "doc": "Schema for tracking data",
  "fields": [
    {
      "doc": "The int type is a 32-bit signed integer.",
      "name": "id",
      "type": "int"
    },
    {
      "doc": "The int type is a 32-bit signed integer.",
      "name": "customer_id",
      "type": "int"
    },
    {
      "doc": "Avro int, where the int stores the number of days from the unix epoch, 1 January 1970 (ISO calendar)",
      "name": "date_time",
      "type": "int"
    },
    {
      "doc": "List of measurements for all products/vehicles used by a customer",
      "name": "measurement_list",
      "type": {
        "type": "array",  
        "items": {
            "type":"record",
            "name":"measurement",
            "fields":[
                {
                  "name":"vehicle_id", "type":"int"
                },
                {
                  "name":"product_id", "type":"int"
                },
                {
                  "name":"journey_km", "type":"float"
                },
                {
                  "name":"total_km", "type":"float"
                }
            ]}
        }
    }
  ],
  "name": "tracking_event",
  "type": "record",
  "namespace": "com.mycorp.mynamespace"
}"""

The upcoming function takes a TrackingEvent object (next to a context) as input and converts it into a dictionary format. The attributes of the TrackingEvent and its associated Measurement objects are extracted and copied into the dictionary. The measurement_list is converted into a list of dictionaries using a list comprehension, preserving the attributes of each Measurement object.

In [7]:
def trackingevent_to_dict(trackingevent,ctx):
    """
    Convert a TrackingEvent object to a dictionary representation.

    Args:
        trackingevent (TrackingEvent): TrackingEvent object to be converted.
        ctx (SerializationContext): Context for serialization.

    Returns:
        dict: Dictionary representation of the TrackingEvent object.
    """

    return {
        "id":trackingevent.id,
        "customer_id":trackingevent.customer_id, 
        "date_time":trackingevent.date_time,

        # Convert the measurement_list using a list comprehension
        "measurement_list":[
            {
                "vehicle_id":e.vehicle_id, 
                "product_id":e.product_id,
                "journey_km":e.journey_km,
                "total_km":e.total_km
            } for e in trackingevent.measurement_list] # Iterate through each Measurement in the list
            }


We define a function that generates simulated tracking event data according to the specified parameters. It creates a list of TrackingEvent objects, each with random values for the various attributes within the specified ranges. The number of events generated is controlled by the n_events argument. 

In [8]:
def data_generator(n_events=1):
    """ 
    Generate simulated tracking event data.

    Args:
        n_events (int): Number of tracking events to generate. Default is 1.

    Returns:
        list: A list of generated TrackingEvent objects.
    """

    #Current settings:
    # event id with 7 digits in range of 1,000,000 to 10,000,000
    # customer id with 6 digits in range of 100,000 to 1,000,000
    # timestamp as current unix time
    # vehicle id with 5 digits in range 10,000 to 100,000
    # product id with 1 digit fixed for 5,6,7 (3 bike types)
    # journey in km in range of 0.0 to 50.00 km 
    # total travelled km in range 0.0 to 50,000.00 km 

    iter_count = range(1,n_events+1)  # Create a range of values from 1 to n_events

    local_timezone = pytz.timezone('Europe/Berlin') # Get the current time in the 'Europe/Berlin' timezone 
    simulation_data = [
        TrackingEvent(
            random.randrange(100000,1000000),
            random.randrange(100000,1000000),
            int(dt.datetime.now(local_timezone).timestamp()),
            [ Measurement(
                random.randrange(10000,100000),
                random.randrange(5,8),
                round(random.uniform(0,50),2),
                round(random.uniform(0,50000),2))
            ]) for n in iter_count
        ]

    return simulation_data


This callback function is typically used when producing messages to a Kafka topic to handle the delivery status of the produced messages:

In [9]:
def delivery_report(err, event): 
    """Callback function to handle delivery reports for produced messages.

    Args:
        err (KafkaError): An error object indicating the delivery status of the message.
        event (Message): A message object representing the produced message.
    """

    if err is not None:
        print(f'Delivery failed on reading for {event.key().decode("utf8")}: {err}')
    else:
        print(f'Temp reading for {event.key().decode("utf8")} produced to {event.topic()}')

The following function encapsulates the process of initializing and configuring an Avro serializer for producing messages. It takes the Kafka topic, the Avro schema_str, and the Schema Registry configuration sr_config as arguments. It returns an initialized AvroSerializer instance that's ready to be used for message production.

In [10]:
def initialize_avro_serializer(topic, schema_str, schema_registry_config):
    """Initialize and configure an Avro serializer for producing messages.

    Args:
        topic (str): The Kafka topic to which messages will be produced.
        schema_str (str): Avro schema definition as a JSON-formatted string.
        schema_registry_config (dict): Configuration for the Schema Registry client.

    Returns:
        AvroSerializer: An initialized Avro serializer instance.
    """

    # Initialize the Schema Registry client
    schema_registry_client = SchemaRegistryClient(schema_registry_config)

    # Create an Avro serializer with the specified schema and schema_registry_client
    avro_serializer = AvroSerializer(schema_registry_client, schema_str, trackingevent_to_dict)

    return avro_serializer

We define a function combines the previous subfunctions into one process: generating simulated tracking events, serializing them using the Avro serializer, and producing them to a Kafka topic using the Kafka producer. Finally, it flushes the producer to ensure all messages are sent.

In [11]:
def generate_and_produce_events(topic, schema_str, config, sr_config):
	"""
	Generate simulated tracking events and produce them to a Kafka topic.

	Args:
		topic (str): The Kafka topic to which messages will be produced.
		schema_str (str): Avro schema definition as a JSON-formatted string.
		config (dict): Configuration for the Kafka producer.
		sr_config (dict): Configuration for the Schema Registry client.

	Returns:
		None
	"""

	simulation_data = data_generator()
	
	avro_serializer = initialize_avro_serializer(topic, schema_str, sr_config)

	producer = Producer(config)

	for event in simulation_data:

		# Produce the serialized event to the Kafka topic
		producer.produce(
			topic=topic, 
			key=str(event.id), # Using event.id as the message key
			value=avro_serializer(
				event,
				SerializationContext(topic, MessageField.VALUE)),
				on_delivery=delivery_report
				)
		
	producer.flush()

Call the function to generate and produce events:

In [12]:
topic = 'tracking_events' # chosen topic (name) of tutorial. Needs to be created beforehand in Confluent Cloud.
generate_and_produce_events(topic, schema_str, config, sr_config)

KeyboardInterrupt: 

We can run the following script to start an infinite loop that continuously generates and produces events using the generate_and_produce_events function. The loop runs until the user interrupts it (typically by pressing Ctrl+C). You can change the delay in seconds to simulate another frequency. 

In [13]:
if __name__ == "__main__":
    try:
        while True:
            generate_and_produce_events(topic, schema_str, config, sr_config)
            time.sleep(2)  # Wait for 2 seconds
    except KeyboardInterrupt:
        print("Script stopped by user.")

Script stopped by user.


#### Building a Consumer which is Avro schema compliant 
Once we have events in the specified topic in Confluent Cloud, we also simulate the consumption of such with a python consumer client. For deserialization we will use the previously created Avro schema which access through the schema registry. You could also adapat the following code to consume data from other topics, but be sure to write the corresponding schema and object definitions (with dictionary representations) as previously shown above.  

This function updates the Kafka consumer configuration, whereby the offset reset setting 'earliest' will allow us to consume all (new) events that have not yet been consumed.

In [14]:
def set_consumer_configs():
    """
    Set the Kafka consumer configuration options for group ID and offset reset.
    """

    config['group.id'] = 'trackingevent_group'
    config['auto.offset.reset'] = 'earliest'

We do the reverse process as done during the producer preparation beforehand. The function constructs a new TrackingEvent object using the values extracted from the dictionary. It is useful when deserializing data back into its original object form. 

In [15]:
def dict_to_trackingevent(dict, ctx):
    """
    Convert a dictionary to a TrackingEvent object using provided context.

    Args:
        dict (dict): Dictionary containing TrackingEvent attributes.
        ctx (SerializationContext): Context for deserialization.

    Returns:
        TrackingEvent: Deserialized TrackingEvent object.
    """
    return TrackingEvent(
        dict['id'], 
        dict['customer_id'], 
        dict['date_time'], 
        dict['measurement_list'])

The following function encapsulates the process of initializing and configuring an Avro deserializer and Kafka consumer for a specific topic. It takes the Kafka topic, the Avro schema_str, and the Schema Registry configuration sr_config as arguments. It then returns an initialized Kafka consumer instance that's ready to be used for consuming messages from the specified topic.

In [16]:
def initialize_avro_consumer(topic, schema_str, sr_config):
    """Initialize and configure an Avro deserializer and Kafka consumer for a topic.

    Args:
        topic (str): The Kafka topic to subscribe to.
        schema_str (str): Avro schema definition as a JSON-formatted string.
        sr_config (dict): Configuration for the Schema Registry client.

    Returns:
        Consumer: An initialized Kafka consumer instance.
    """

    # Initialize the Schema Registry client
    schema_registry_client = SchemaRegistryClient(sr_config)

    # Create an Avro deserializer with the specified schema and schema_registry_client
    avro_deserializer = AvroDeserializer(schema_registry_client, schema_str, from_dict=dict_to_trackingevent)

    # Set the consumer configurations
    set_consumer_configs()

    # Create a Kafka consumer instance and subscribe to the topic
    consumer = Consumer(config)
    consumer.subscribe([topic])

    return consumer, avro_deserializer


The process of initializing an Avro consumer, subscribing to a Kafka topic, and consuming tracking events using an Avro deserializer is combined in the following function. It runs in an infinite loop, printing out information about the consumed events. The loop exits gracefully on a KeyboardInterrupt (typically triggered by pressing Ctrl+C) and ensures that the consumer is properly closed.

In [None]:
def consume_tracking_events(topic, schema_str, sr_config):
    """Consume tracking events from a Kafka topic using an Avro deserializer.

    Args:
        topic (str): The Kafka topic to subscribe to.
        schema_str (str): Avro schema definition as a JSON-formatted string.
        sr_config (dict): Configuration for the Schema Registry client.

    Returns:
        None
    """
    # Initialize the Avro consumer and deserializer
    consumer, avro_deserializer = initialize_avro_consumer(topic, schema_str, sr_config)

    try:
        while True:
            event = consumer.poll(1.0)
            if event is None:
                continue

            # Deserialize the event using the Avro deserializer
            t_event = avro_deserializer(event.value(), SerializationContext(topic, MessageField.VALUE))
            
            if t_event is not None:
                print(f'Latest trackevent with customer id {t_event.customer_id} came with the measurement list {t_event.measurement_list}')

    except KeyboardInterrupt:
        pass  # Allow the script to gracefully exit when the user interrupts

    finally:
        consumer.close()  # Close the consumer when done


%3|1692390402.156|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9]: sasl_ssl://pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9092/bootstrap: Receive failed: SSL transport error: Operation timed out (after 290354ms in state UP)
%3|1692390402.159|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9]: sasl_ssl://pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9092/bootstrap: Failed to resolve 'pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
%3|1692390402.159|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://b5-pkc-w7d6j.germanywestcentral.azure.confluent.clou]: sasl_ssl://b5-pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9092/5: Failed to resolve 'b5-pkc-w7d6j.germanywestcentral.azure.confluent.cloud:9092': nodename nor servname provided, or not known (after 2ms in state CONNECT)
%3|1692390402.264|FAIL|rdkafka#prod

Call the function to start consuming tracking events:

In [None]:
topic = 'tracking_events'

consume_tracking_events(topic, schema_str, sr_config)