# Kafka Producer

## Interacting with Kafka Producer from the Shell

The Producer is the Kafka abstraction for publishing data _to_ a Kafka topic.

It is completely detached from any possible Consumer process; therefore, there is no need to directly connect a Producer (Sender) and a Consumer (Receiver) in Kafka due to its Pub/Sub messaging model.

However, for a Producer to publish a message, we need to specify at least two things:
1. The **topic** to which the messages will be published.
2. The **entry point of the cluster** over the network.

The goal is to work with Kafka programmatically (from Python/C/etc.) to interface it with other applications.
However, initially, we will interact with the Kafka cluster in _interactive mode_ from the shell.

Apache Kafka provides a set of bash scripts to interact with and operate the cluster for basic operations and testing, such as:
- Topic creation, configuration, and inspection.
- Shell-based message producer.
- Shell-based message consumer.
- Shell-based performance testing.
- ...

Let's first connect to the Kafka cluster from the shell by logging into the Kafka-Broker container:

```bash
$ docker exec -it <your_kafka-broker_container_name> bash
```

e.g.:

```bash
$ docker exec -it kafka-kafka-broker-1 bash
```

The Kafka installation is located in the folder whose path is stored as an environmental variable `$KAFKA_HOME`. 
Inside that folder, you can find the binaries (the "executables") to use Kafka directly.

To navigate to the `bin` folder, use the following command:
```bash
cd $KAFKA_HOME/bin
```

From there, you can use the basic Kafka binaries from the shell to issue commands such as:

- List all available topics:

```bash
./kafka-topics.sh --list --bootstrap-server kafka-broker:9092
```

- Create a new topic:

```bash
./kafka-topics.sh --create --topic my_awesome_topic --bootstrap-server kafka-broker:9092
```

- Describe a specific topic:

```bash
./kafka-topics.sh --describe --topic my_awesome_topic --bootstrap-server kafka-broker:9092
```

We can also use the shell to send messages to the newly created topic:

To start the Kafka console producer, use the following command:

```bash
./kafka-console-producer.sh --topic my_awesome_topic --bootstrap-server kafka-broker:9092
```

At this point, you should be able to send messages to the topic you just created via the `kafka-console-producer`.

So far, no consumer is available to process or display those messages. However, the messages are successfully sent to the topic, increasing the log(s) in the partition(s).

Let's create a console consumer and subscribe to the topic.

To do this, connect to the same Docker container running the Kafka broker and navigate to the folder containing the Kafka binaries.

From there, you can:

- Consume the messages from our previously created topic:
  
```bash
./kafka-console-consumer.sh --topic my_awesome_topic --bootstrap-server kafka-broker:9092 [--from-beginning]
```

**Note**: We can instruct the Consumer to go back and read data from the beginning, but the Producer can only append new data to the log.

## Programmatically produce messages to Kafka with Python

There are various Python modules available to interact with Kafka programmatically, including:
- `kafka-python`
- `confluent-kafka`
- `pyKafka`

The differences between these modules are relatively minor. As always, take your time to review the documentation of all alternatives before starting a project.

For now, we'll use `kafka-python` to handle topics and producers.

In [None]:
! pip install kafka-python # confluent-kafka

To instantiate a Kafka producer using `kafka-python`, you can use the following code:

```python
from kafka import KafkaProducer

# Create a Kafka producer instance
producer = KafkaProducer(
    bootstrap_servers=['62.30.10.23:9092'],  # List of brokers
    security_protocol="SSL",                 # Security protocol (if any) 
    ssl_cafile="./ca.pem",                   # Certificate details (if any)
    ssl_certfile="./service.cert",           # ...
    ssl_keyfile="./service.key",             # ...
    value_serializer=msgpack.dumps           # Message value serialization function 
                                             # (e.g., interpreting the message as a specific format)
)
```

We'll work with the vanilla version of the producer, which does not require any certificates or specific serialization in this example.

Here's an example of a simple producer instantiated by pointing it to the Kafka brokers:

In [None]:
# define the list of brokers in the cluster
KAFKA_BOOTSTRAP_SERVERS = ['kafka-broker:9092']

In [None]:
from kafka import KafkaProducer

# Create a Kafka producer instance
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

Let's publish a message to the topic we previously created _without specifying any key_.

In [None]:
# publish a message to a topic without a key
producer.send(topic='my_awesome_topic', 
              value=b'message 1')

The output message `<kafka.producer.future...>` explicitly tells us that the record has been created and will be sent at some point.

However, when we pressed return on the previous cell, the message had not been sent yet.

The `KafkaProducer.send()` method is _asynchronous_, which means it enqueues the message on an internal queue. The actual sending of the message to the broker happens later, based on tunable parameters like the maximum buffering time or the number of messages.

This behavior is beneficial because packing multiple messages in small batches improves data transfer efficiency. While sending a single message may not show noticeable performance differences, imagine sending billions of messages per day, which translates to millions of messages per second. Optimizing data transfer and minimizing communication overhead between producers and topics becomes crucial.

It's important to be aware that messages won't necessarily be sent immediately. If a high message rate is sent and the `exit()` command is issued right after a `producer.send()` call, it's possible that no messages are actually sent because the maximum buffering time or the number of messages has not been reached before the program exits.

To send messages synchronously, you can use the `flush()` method of the producer. It ensures that all outstanding messages are sent before proceeding.

For more information on the tunable parameters and additional functionalities, you can refer to the KafkaProducer API documentation: [link to KafkaProducer API](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html).


In [None]:
# publish a message and flush the queue right away
producer.send('my_awesome_topic', b'a new message')
producer.flush()

It's important to realize that producers and consumers are completely decoupled in Kafka. This means that the functioning of a consumer is not affected even if a producer "dies".

The decoupled nature of Kafka allows consumers to access the topic and consume messages regardless of the state of the producer.

Even if a producer becomes unavailable or experiences an issue, the messages previously sent and stored on the brokers are still accessible to consumers. This ensures data durability and enables fault tolerance in the Kafka ecosystem. Consumers can continue to consume messages from the topic without any interruption, as long as the messages are available on the brokers.

This decoupling of producers and consumers in Kafka is a key architectural feature that enables scalability, fault tolerance, and flexibility in building distributed systems.

It's worth noting that while consumers are not directly impacted by the state of the producer, they may experience delays in receiving messages if there are any disruptions, such as network issues or producer errors, affecting the producer's ability to send messages. However, once the producer resumes normal operation, the consumers can continue consuming messages from where they left off.

This decoupled nature of Kafka makes it well-suited for building robust and scalable distributed systems where producers and consumers can operate independently and asynchronously.

In [None]:
# stop the producer
producer.close()

In [None]:
# create a new Kafka producer and send a new message to the same topic
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
producer.send('my_awesome_topic', b'a message from a newly revived producer')

## Messages with Key

In Kafka, messages are structured as `<key, value>` pairs.

So far, we have produced messages only with a given `value`, but it's also possible to include a `key` for each message. The `key` serves as an optional identifier for the message and can be used for various purposes, such as routing messages to specific partitions within a topic.

When a producer sends a message with a key, Kafka uses the key to determine the partition to which the message should be assigned. By default, Kafka uses a _hashing algorithm_ to evenly distribute messages across partitions based on their keys. 
This ensures that messages with the same key are always assigned to the same partition, providing a form of message ordering within a partition.

Adding a key to messages can be useful in scenarios where you want to ensure ordering or group related messages together based on a common key. It enables consumers to process messages in a consistent and predictable manner.

To include a key when producing messages, you can modify the previous code example as follows:

In [None]:
# send messages to Kafka using a specific key
producer.send(topic='my_awesome_topic', 
              key=b'some_key', 
              value=b'a message with key')
producer.flush()

## Create a topic from kafka-python

Using the kafka-python library, you can administer a Kafka cluster and create new topics with specific configuration parameters, such as the replication factor.

In [None]:
# import the necessary packages to administer the Kafka cluster
from kafka.admin import KafkaAdminClient, NewTopic

# connect to the cluster to run admin functions
kafka_admin = KafkaAdminClient(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
)

It is possible to retrieve the list of topics, equivalent of issuing the following using the Kafka binaries from shell:

```bash
./kafka-topics.sh --list --bootstrap-server kafka-broker:9092
```

In [None]:
# list the available topics
kafka_admin.list_topics()

Topics in Kafka are partitioned entities. Each topic is divided into multiple partitions, and within each partition, events are added to the end of a log, resulting in an ordered list of records.

When you publish a new message to a partitioned topic, the message is appended to the end of the log retained by the owner of the specific partition to which the message is assigned. 

Each partition has one leader and zero or more follower replicas. If replication is enabled for the topic, the message will be replicated and distributed to the follower replicas of the partition.

The leader partition handles all read and write operations for the partition, while the follower replicas replicate the log from the leader to ensure fault tolerance and high availability. If the leader partition fails, one of the follower replicas will be elected as the new leader to continue processing messages for that partition.

The use of partitions provides several benefits in Kafka including scalability and parallelism.

It's important to note that the distribution and replication of messages across partitions are managed internally by Kafka and are transparent to the producer and consumer applications. The producer only needs to specify the topic and optionally the key for a message, and Kafka takes care of routing the message to the appropriate partition.


In [None]:
# create a new topic with the following parameters:
#    number of partitions = 2
#    replication factor   = 1 (i.e. no replication)
a_new_topic = NewTopic(name='a_partitioned_topic', 
                       num_partitions=2, 
                       replication_factor=1)

kafka_admin.create_topics(new_topics=[a_new_topic])

In [None]:
# list the available topics
kafka_admin.list_topics()

---

## Publish messages for the Spark Structured Streaming example

Kafka can be used as a source for incoming messages in Spark Streaming and Structured Streaming.

We'll use the pySpark `Structured Streaming` API to implement the example previously seen in the Spark hands-on sessions.

In [None]:
import json
import time
import random

first_names = ('John', 'Andy', 'Joe', 'Alice')
last_names = ('Johnson', 'Smith', 'Jones', 'Millers')

# while 1: # Uncomment this to send a continuous stream of messages
for i in range(20):
    msg = {
        'name': random.choice(first_names),                           # Select a random first name
        'surname': random.choice(last_names),                         # Select a random last name
        'amount': float('{:.2f}'.format(random.random() * 1000)),     # Generate a random amount
        'delta_t': float('{:.2f}'.format(random.random() * 10)),      # Generate a random delta_t
        'flag': random.choices([0, 1], weights=[0.8, 0.2])[0]         # Randomly choose a flag value
    }
    producer.send('a_partitioned_topic',
                  json.dumps(msg).encode('utf-8')  # Convert the message to JSON and encode as UTF-8
                  )
    producer.flush()  # Flush the producer buffer
    time.sleep(0.25)  # Sleep for a short duration before sending the next message

Let's create a new topic to store the `results` of the Spark processing of the Kafka stream.

In [None]:
# create a new topic
a_new_topic = NewTopic(name='results', 
                       num_partitions=2, 
                       replication_factor=1)

# create the new topic
kafka_admin.create_topics(new_topics=[a_new_topic])

# check the list of available topics
kafka_admin.list_topics()