# Kafka Producer

---

<img src="https://learning.oreilly.com/library/view/kafka-the-definitive/9781492043072/assets/ktdg_0301.png" alt="producer" width="500"/>
---

## Preparation
The first step is to install the required libraries, in our case `kafka-python` (and `pandas` to support visualization)

In [None]:
!pip install kafka-python

declare variable

In [None]:
username = "pujo"
server_ip = "34.87.150.250"
bootstrap_servers = f"{server_ip}:9092,{server_ip}:9093,{server_ip}:9094"
schema_registry_url = "http://34.87.150.250:8081"

---

## Async and Sync 

Now we can create a Kafka Producer


In [None]:
from kafka import KafkaProducer

topic_name = f"{username}-topic1"

producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

for _ in range(10):
    producer.send(topic_name, b'some_message_bytes')

function `producer.send` is asynchronous by default

In [None]:
for _ in range(100):
    producer.send(topic_name, b'some_message_bytes')

producer.flush()

you could change it to synchronous operation by using `get` function

In [None]:
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# producer = KafkaProducer(bootstrap_servers=bootstrap_servers, max_request_size=1) # failed producer scenario

future = producer.send(topic_name, b'synchronous message')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)

    # Successful result returns assigned partition and offset
    print (record_metadata.topic)
    print (record_metadata.partition)
    print (record_metadata.offset)

except KafkaError:
    # Decide what to do if produce request failed...
    print("send data failed")
    pass


handle exception on asynchronous send with callback

In [None]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# producer = KafkaProducer(bootstrap_servers=bootstrap_servers, max_request_size=1) # failed producer scenario

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    print("handle error")
    print(excp)
    log.error('I am an errback', exc_info=excp)
    # handle exception

producer \
    .send(topic_name, b'mesage async') \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)
producer.flush()

configure multiple retries

In [None]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, retries=5)
# producer = KafkaProducer(bootstrap_servers=bootstrap_servers, retries=5, max_request_size=1) # failed producer scenario

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    print("handle error")
    print(excp)
    log.error('I am an errback', exc_info=excp)
    # handle exception

producer \
    .send(topic_name, b'mesage async') \
    .add_callback(on_send_success) \
    .add_errback(on_send_error)
producer.flush()

---

## Configuring Producer

Kafka producer API https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

In [None]:
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers, 
    retries=5,
    client_id=f"{topic_name}-producer-1",
    acks="all"
)

- **client.id** A logical identifier for the client and the application it is used in. This can be any string, and will be used by the brokers to identify messages sent from the   client. It is used in logging and metrics, and for quotas. Choosing a good client name will make troubleshooting much easier - it is the difference between “We are seeing high rate of authentication failures from IP 104.27.155.134” and “Looks like the Order Validation service is failing to authenticate, can you ask Laura to take a look?”


- **acks** parameter controls how many partition replicas must receive the record before the producer can consider the write successful. 
    - **If acks=0**, the producer will not wait for a reply from the broker before assuming the message was sent successfully.
    - **If acks=1**, the producer will receive a success response from the broker the moment the leader replica received the message.
    - **If acks='all'**, the producer will receive a success response from the broker once all in-sync replicas received the message.

### Message Delivery Time

The producer has multiple configuration parameters that interact to control one of the behaviors that are of most interest to developers: How long will it take until a call to send() will succeed or fail. This is the time we are willing to spend until Kafka responds successfully, or until we are willing to give up and admit defeat.

Since Apache Kafka 2.1, we divide the time spent sending a ProduceRecord into two time intervals that are handled separately:

- Time until an async call to send() returns - during this interval the thread that called send() will be blocked.

- From the time an async call to send() returned successfully until the callback is triggered (with success or failure). This is also from the point a ProduceRecord was placed in a batch for sending, until Kafka responds with success, non-retriable failure, or we run out of time allocated for sending.

<img src="https://learning.oreilly.com/library/view/kafka-the-definitive/9781492043072/assets/newtimeout.png" alt="producer" width="800"/>



---

## Serializers

Kafka accept `bytes` as value of `key` and `value`

In [None]:
producer.send(topic_name, key=b'key byte', value=b'message byte')
producer.flush()

you could change it to other format by using custom serializers

for example: **string serializer**

In [None]:
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    value_serializer=str.encode,
    key_serializer=str.encode
)

producer.send(f"{topic_name}_str", key='key_str', value='value str')

for example: **json serializer**

In [None]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(
 bootstrap_servers=bootstrap_servers,
 value_serializer=lambda v: json.dumps(v).encode('ascii'),
 key_serializer=lambda v: json.dumps(v).encode('ascii')
)

producer.send(
    f"{topic_name}_json",
    key={"id":1},
    value={"name":"👨 Francesco", "pizza":"Margherita 🍕"}
)

producer.send(
    f"{topic_name}_json",
    key={"id":2},
    value={"name":"👩 Adele", "pizza":"Hawaii 🍕+🍍+🥓"}
)

producer.send(
    f"{topic_name}_json",
    key={"id":3},
    value={"name":"👦 Mark", "pizza":"Choccolate 🍕+🍫"}
)

producer.flush()

---

## If possible use confluent-kafka-python library ([or other specific lang](https://docs.confluent.io/platform/current/clients/index.html))

Install libs, [documentation](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html)

In [None]:
!pip install fastavro
!pip install pyrsistent
!pip install jsonschema
!pip install protobuf
!pip install requests
!pip install pycodestyle
!pip install "avro-python3==1.9.2"
!pip install confluent-kafka==1.7.0

Initialize producers

all configurations could be seen in here https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

In [None]:
from confluent_kafka import Producer

producer = Producer({
    "bootstrap.servers": bootstrap_servers,
    "client.id": f"{username}-training-1"
})

topic_name = f"{username}-topic2"

Asynchronous writes by default

In [None]:
producer.produce(topic_name, key="key", value="value")

handle exceptions

In [None]:
producer = Producer({
    "bootstrap.servers": bootstrap_servers,
    "client.id": f"{username}-training-1"
})

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

producer.produce(topic_name, key="key", value="value", callback=acked)

# Wait up to 1 second for events. Callbacks will be invoked during
# this method call if the message is acknowledged.
producer.poll(1)

synchronous write

In [None]:
producer.produce(topic_name, key="key sync", value="value sync")
producer.flush()

Typically, `flush()` should be called prior to shutting down the producer to ensure all outstanding/queued/in-flight messages are delivered.

### Serializer

#### JSON

In [None]:

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer

topic_name_json = f"{topic_name}_json"

schema_str = """
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "description": "A Confluent Kafka Python User",
  "type": "object",
  "properties": {
    "name": {
      "description": "User's name",
      "type": "string"
    },
    "favorite_number": {
      "description": "User's favorite number",
      "type": "number",
      "exclusiveMinimum": 0
    },
    "favorite_color": {
      "description": "User's favorite color",
      "type": "string"
    }
  },
  "required": [ "name", "favorite_number", "favorite_color" ]
}
"""

def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


schema_registry_conf = {'url': schema_registry_url}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

json_serializer = JSONSerializer(schema_str, schema_registry_client)

producer_conf = {'bootstrap.servers': bootstrap_servers,
                    'key.serializer': StringSerializer('utf_8'),
                    'value.serializer': json_serializer}

producer = SerializingProducer(producer_conf)

user2 = dict(name="test",
            favorite_color="red",
            favorite_number=1)

producer.produce(topic=topic_name_json, key="json msg", value=user2, on_delivery=delivery_report)

# flush message for demo
producer.flush()

#### Avro

In [None]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

topic_name_avro = f"{topic_name}_avro"

value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': bootstrap_servers,
    'on_delivery': delivery_report,
    'schema.registry.url': schema_registry_url
}, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic=topic_name_avro, value=value, key=key)
avroProducer.flush()

## Confluent schema registry

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro®, JSON Schema, and Protobuf schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.

<img src="https://learning.oreilly.com/library/view/kafka-the-definitive/9781492043072/assets/ktdg_0302.png" alt="producer" width="800"/>



---

## Partition

### Create topic

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic


admin = AdminClient({'bootstrap.servers': bootstrap_servers})

topic_name_partitioned = f"{topic_name}_partitioned"

new_topics = [NewTopic(topic_name_partitioned, num_partitions=3, replication_factor=1)]
# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = admin.create_topics(new_topics)

# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

### Produce message

In [None]:
producer.produce(topic_name_partitioned, key="key1", value="value1")
producer.produce(topic_name_partitioned, key="key2", value="value2")
producer.produce(topic_name_partitioned, key="key3", value="value3")
producer.produce(topic_name_partitioned, key="key4", value="value4")
producer.produce(topic_name_partitioned, key="key100", value="value4")
producer.produce(topic_name_partitioned, key="key101", value="value4")
producer.produce(topic_name_partitioned, key="key102", value="value4")
producer.produce(topic_name_partitioned, key="key101", value="value4")
producer.produce(topic_name_partitioned, key="key101", value="value4")
producer.produce(topic_name_partitioned, key="key101", value="value4")
producer.flush()

by default partitioned by hash

but it can produce to specific partition too

In [None]:
producer.produce(topic_name_partitioned, key="key1", value="value1", partition=2)
producer.produce(topic_name_partitioned, key="key2", value="value2", partition=2)
producer.produce(topic_name_partitioned, key="key3", value="value3", partition=2)
producer.produce(topic_name_partitioned, key="key4", value="value4", partition=2)
producer.flush()

but you can also create custom partitioner logic on your own, use librdkafka config in here
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

So far, we have discussed the traits of the default partitioner, which is the one most commonly used. However, Kafka does not limit you to just hash partitions, and sometimes there are good reasons to partition data differently. For example, suppose that you are a B2B vendor and your biggest customer is a company that manufactures handheld devices called Bananas. Suppose that you do so much business with customer “Banana” that over 10% of your daily transactions are with this customer. If you use default hash partitioning, the Banana records will get allocated to the same partition as other accounts, resulting in one partition being much larger than the rest. This can cause servers to run out of space, processing to slow down, etc. What we really want is to give Banana its own partition and then use hash partitioning to map the rest of the accounts to partitions.

example:
```java
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {} 1

    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes,
                         Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if ((keyBytes == null) || (!(key instanceOf String))) 2
            throw new InvalidRecordException("We expect all messages " +
                "to have customer name as key")

        if (((String) key).equals("Banana"))
            return numPartitions - 1; // Banana will always go to last partition

        // Other records will get hashed to the rest of the partitions
        return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }

    public void close() {}
}

```