
# Notebook - Consumer

## **Kafka Producer-Consumer Demo Overview**

In this demo, we'll be exploring a fundamental pattern in event-driven architectures: producing and consuming messages with Kafka. We will simulate a simple scenario where users (riders) request cars, producing this request as a message to a Kafka topic. Concurrently, we will have a consumer reading these requests and processing them.

**Key Components:**

1. **Producer:** Simulates user requests by producing random `rider-name` and `location` data to a Kafka topic.
2. **Consumer:** Listens to the Kafka topic and processes the incoming user requests by printing them.
3. **Configuration Loader:** Loads the Kafka configurations required for authentication and communication.
4. **Callbacks:** Handle specific events, such as a successful message delivery or topic partition assignment.

**Technical Highlights:**

- Asynchronous programming with `asyncio` to handle concurrent operations.
- Use of the Confluent Kafka Python library for Kafka operations.
- Integration with `.env` files for environment variable management using `dotenv`.

## List of Used files

Please run this python file in your directory. [Standalone python file](demo_d_consumer_v2.py).

The `.env` file is [here](.env) as reference.

## **Kafka Consumer-Producer Demo Setup and Execution**

### **1. Virtual Environment Setup**

Virtual environments allow for isolated spaces to manage dependencies. It's beneficial to use a virtual environment to avoid potential conflicts between package versions.

#### **1.1. Create & Activate a Virtual Environment**

- **Create**: Navigate to your project directory and execute:
  
  ```bash
  python -m venv venv
  ```

- **Activate**:

  - For **Linux & Mac**:
    ```bash
    source venv/bin/activate
    ```
  
  - For **Windows (PowerShell)**:
    ```bash
    .\venv\Scripts\Activate.ps1
    ```

#### **1.2. Install Dependencies**

With the virtual environment activated, install the necessary packages:

```bash
pip install confluent_kafka python-dotenv asyncio
```

### **2. Kafka Cluster Setup and API Key Creation**

To connect with the Kafka cluster, follow these steps:

#### **2.1. Get Cluster Description**

Retrieve details of the Kafka cluster:

```bash
confluent kafka cluster describe
```

#### **2.2. Create an API Key**

Generate an API key for your Kafka cluster:

```bash
confluent api-key create --resource {id}
```

Replace `{id}` with your cluster ID from the previous step.

#### **2.3. Set up Environment Variables**

Use the provided details to populate an `.env` file:

```
BOOTSTRAP_SERVERS={Endpoint}
SECURITY_PROTOCOL=SASL_SSL
SASL_MECHANISMS=PLAIN
SASL_USERNAME={API Key}
SASL_PASSWORD={API Secret}
CONSUMER_GROUP_ID=my_consumer_group
```

### **3. Running the Demo**

Once the setup is complete, run the demo script:

```bash
python3 demo_d_consumer_v2.py consumer_example_v2
```

### **4. (Optional) Jupyter Notebook Setup**

If using Jupyter Notebook or Jupyter Lab, it's crucial to set the appropriate Python kernel.

#### **4.1. Install `ipykernel`**

```bash
pip install ipykernel
```

#### **4.2. Set the Jupyter Kernel**

- Start Jupyter Notebook or Jupyter Lab.
- Open the desired notebook.
- Select `Kernel` -> `Change kernel` -> `venv`.

**Note**: Asynchronous tasks in Jupyter might behave unexpectedly. It's advised to run the Kafka code as a standalone script.


## Create Topics

This part is the same as demos in Lec 2. 

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic
    
## Recommended way of loading secrets from .env file
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def load_config():
    """Load Kafka configuration."""
    return {
        'bootstrap.servers': os.getenv('BOOTSTRAP_SERVERS'),
        'security.protocol': os.getenv('SECURITY_PROTOCOL'),
        'sasl.mechanisms': os.getenv('SASL_MECHANISMS'),
        'sasl.username': os.getenv('SASL_USERNAME'),
        'sasl.password': os.getenv('SASL_PASSWORD')
    }

## 
def topic_exists(admin_client, topic_name):
    """Check topic existence."""
    return topic_name in set(admin_client.list_topics(timeout=5).topics.keys())

def create_topic(admin_client, topic_name, partitions=1, replication_factor=1, config={}):
    """Create topic if not existing."""
    if not topic_exists(admin_client, topic_name):
        new_topic = [
            NewTopic(
                topic_name, 
                num_partitions=partitions, 
                replication_factor=replication_factor, 
                config=config)]
        created_topic = admin_client.create_topics(new_topic)
        for topic, f in created_topic.items():
            try:
                f.result()
                print(f"Topic {topic} created")
            except Exception as e:
                print(f"Failed to create topic {topic}: {e}")
    else:
        print(f"Topic {topic_name} already exists")

# Main execution
config = load_config()
admin_client = AdminClient(config)
topic_name = "consumer_example_v2"

# Create topic
topic_config = {'cleanup.policy': 'compact'} 
create_topic(admin_client, topic_name, partitions=3, replication_factor=3, config=topic_config)


In [None]:
# List all topics 
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

topic_metadata = admin_client.list_topics(timeout=5)
list(topic_metadata.topics.keys())

## Demo: Running Consumer

In [None]:
import asyncio
import sys,random
from confluent_kafka import Consumer, Producer, OFFSET_BEGINNING# Consumer Group ID for ensuring unique offset tracking

CONSUMER_GROUP_ID = os.getenv('CONSUMER_GROUP_ID', 'default-group-id')

async def consume(topic_name):
    """Asynchronously consume data from the specified Kafka Topic."""
    
    # Short delay before initiating the consumer
    await asyncio.sleep(2.5)

    # Configure consumer with Kafka settings and subscribe to the topic
    c = Consumer({
        **config,
        "group.id": CONSUMER_GROUP_ID,
        "auto.offset.reset": "earliest",
    })
    c.subscribe([topic_name], on_assign=on_assign)

    # Continuously poll for new messages in the topic
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value()}")
        await asyncio.sleep(0.1)  # Brief pause to reduce CPU load


In [None]:
def on_assign(consumer, partitions):
    """Callback executed when partitions are assigned. Sets partition offset to beginning."""
    for partition in partitions:
        partition.offset = OFFSET_BEGINNING
    consumer.assign(partitions)

In [None]:
def delivery_report(err, msg):
    """Callback function to report the result of a produce operation."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

async def produce(topic_name, config = config):
    """Asynchronously produce random person-location data into the specified Kafka Topic."""
    
    p = Producer(config)
    names = ["Alice", "Bob", "Charlie"]
    
    # Continuously produce messages to the topic
    while True:
        name = random.choice(names)
        lat = random.uniform(-90, 90)
        long = random.uniform(-180, 180)
        message_key = f"rider-{name}".encode("utf-8")
        message_value = f"rider {name} requests a car at ({lat:.2f}, {long:.2f})".encode("utf-8")
        
        p.produce(topic_name, key=message_key, value=message_value, callback=delivery_report)
        p.poll(0.1)  # Poll to allow callbacks to be executed
        await asyncio.sleep(0.1)  # Brief pause to reduce CPU load

async def produce_consume(topic_name):
    """Concurrently run producer and consumer tasks using asyncio."""
    
    t1 = asyncio.create_task(produce(topic_name))  # Task for producing messages
    t2 = asyncio.create_task(consume(topic_name))  # Task for consuming messages
    await t1  # Wait for producer task to complete (infinite loop in this case)
    await t2  # Wait for consumer task to complete (infinite loop in this case)


In [None]:
## main
topic_name = 'consumer_example_v2'
await produce_consume(topic_name)

consumed message b'rider-Charlie': b'rider Charlie requests a car at (-31.57, 153.22)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-89.11, -129.61)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-38.85, -4.53)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-47.60, 90.91)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-40.77, 18.43)'
consumed message b'rider-Bob': b'rider Bob requests a car at (56.72, -86.71)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-53.12, -69.19)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (-9.57, -132.77)'
consumed message b'rider-Bob': b'rider Bob requests a car at (-34.03, 12.96)'
consumed message b'rider-Bob': b'rider Bob requests a car at (8.60, 178.15)'
consumed message b'rider-Bob': b'rider Bob requests a car at (-57.89, 43.77)'
consumed message b'rider-Charlie': b'rider Charlie requests a car at (26.04, -161.

Note: Stopping the execution of a cell in Jupyter (such as by pressing the "Stop" button) will interrupt the kernel, which should ideally stop the execution of the code within the cell. However, there could be a few reasons why you're still seeing outputs:

1. **Asynchronous Nature**: The nature of asynchronous tasks means that while the main task (in this case, the Jupyter cell execution) might be stopped, some tasks might still be in the queue to be executed.

2. **Kafka Consumer Lag**: Depending on how you've configured Kafka and your consumer, there might be a lag between when messages are produced and when they are consumed. If your producer produced many messages quickly, the consumer might still be processing those even after the producer has stopped.

3. **Jupyter Kernel State**: In some situations, the Jupyter kernel might not effectively stop the execution of certain tasks or threads. This can especially be the case with certain libraries or tools that manage their own internal threads or processes.

To ensure that everything is stopped:

1. **Manual Stop**: After you've pressed the "Stop" button in Jupyter, you can manually close the producer and consumer connections to Kafka if they were opened. This will ensure that no new messages are produced or consumed.

2. **Restart the Kernel**: In the Jupyter Notebook toolbar, there's an option to restart the kernel. This will completely reset the Python process running your notebook, ensuring all tasks, threads, and processes related to it are stopped. It's a bit of a "nuclear option", but it's a surefire way to stop everything.

3. **Improve Cleanup in Code**: Consider adding cleanup code that will be executed when stopping the tasks. For instance, closing the Kafka connections gracefully, ensuring all asynchronous tasks are canceled, etc. 

In the long term, if you're finding that running these tasks in Jupyter is causing issues, you might want to consider other environments for long-running or complex asynchronous tasks, like a standalone Python script, especially when dealing with systems like Kafka.