Skip to content

Building Scalable Producers πŸ₯§

Lyes S edited this page Jul 3, 2022 · 4 revisions

Table Of Contents

Producer Internals

The Kafka producer is a client library that is embedded within the client which is usually written in a programming language like Java, Scala or Python.

When a Kafka producer is created by the main program it receives various configuration information, the key among them being the list of bootstrap servers (We need to specify at least one bootstrap server for Kafka). The producer then contacts that bootstrap server and collects metadata. It first collects information about all the Kafka brokers in the cluster. It also receives information about topics, their partitions and partition leaders.

It also subscribes to changes in the metadata and keep track of them.

Publishing Messages

Clients sends a Producer Record using the send() method.

The first thing the producer does is to serialize the message with the key and the value serializers setup in the Kafka connection properties.

Next comes the partition selection. The partition for the message is selected by the producer not by Kafka cluster. If the message has the key then the producer will use hash partitioning to identify the given partition, else it will use a round Robin method.

The producer maintains a local batch for each partition which is in a memory cache. The message is then added to the corresponding partition batch.

A separate producer thread is used to push each batch to the corresponding partition leader broker at periodic intervals (This interval is configurable).

Once the partition leader has received and processed the message the producer will receive an acknowledgement which it will pass to the host program.

Producer Publishing Options

Synchronous Mode

In this case the client code sends the message to the local producer, it then waits for the message to be actually sent to Kafka and the acknowledgement received. The host thread is blocked until the acknowledgement received. So if there any errors in sending the same thread can process the error before moving on to the next message.

The call returns a RecordMetaData object which contains information the partition of the message published to and the Kafka offset for the message in that partition. This method is simple, ensures guaranteed message delivery and has the ability to process errors in the same thread. The thread can either try republishing or skip the message. Given that it is synchronous, this method is slow as it waits for the network round trip to the Kafka broker which intern needs to write and replicate the data.

It does not explore the batch publishing capability of the producer as it publishes one message at a time.

Asynchronous Mode with No-Check

Also called Fire and Forget Method.

The client sends the message to the local producer but does not wait for acknowledgement. The host thread is not blocked but it moves on to further processing. The local producer caches the message in batches and a separate thread publishes it to Kafka.

This method has low latency and scaling from the client side as the client thread does not wait for acknowledgement. Failures are not tracked an retried, so there is the possibility of missed messages in case network failures.

Asynchronous Mode with Callback

The client sends the message to the local producer and it also provides a callback function to process the results. The client moves on with further processing and does not blocked.

The local producer caches and then uses a separate thread to publish to Kafka. When results are returned by Kafka, the callback function is called with the RecordMetaData object or with the exceptions encountered.

It is up to the client now to take action. This has low latency as the client code is not blocked but the error handling can get complex especially if message ordering is needed.

Acknowledgement in Kafka

A producer client needs to know if the message has been received successfully and stored by the broker. It is also helpful to know if the message has been replicated.

The behavior of acknowledgements is controlled by the acks parameter set on the producers side during broker configuration. It determines the number of replicas of a partition that must successfully receive the message before the send operation is declared successful.

The more the replicas that are successfully updated, the better is the guarantee that messages have been received and is available even in case of broker failures. But more of the replicas also means more latency in completing the operation as the partition data needs to coordinate with more brokers to complete the operation.

There are 03 possible values for the acks parameter ( Source [1] ) :

Additional Producer Parameters

( Source [1] )

Please refer to the official documentation available at https://kafka.apache.org/documentation/#producerconfigs for more information

Java Producer Options Example

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.record.CompressionType;

import java.util.Properties;
import java.util.Random;

public class KafkaProducerWithOptions {

    public static void main(String[] args) {

        //Setup Properties for Kafka Producer
        Properties kafkaProps = new Properties();

        //List of brokers to connect to
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092,localhost:9093,localhost:9094");

        //Serializer class used to convert Keys to Byte Arrays
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //Serializer class used to convert Messages to Byte Arrays
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        //Set ACKS to all so all replicas needs to acknolwedge
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");

        //Set compression type to GZIP
        kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
                CompressionType.GZIP.name);

        //Create a Kafka producer from configuration
        KafkaProducer optionsProducer = new KafkaProducer(kafkaProps);

        //Use a Random number to generate message keys
        Random randomKey = new Random();

        /**************************************************************
         Publish Asynchronously without any checks
         **************************************************************/
        //Create the record
        ProducerRecord<String, String> asyncNoChecksRec =
                new ProducerRecord<String, String>(
                        "kafka.learning.orders",    //Topic name
                        String.valueOf(randomKey.nextInt(1000)),
                        "This is order published asynchronously with NO checks"
                );

        try {
            //No checks used
            optionsProducer.send(asyncNoChecksRec);

            System.out.println("\nSent Asynchronously, with no Checks :" + asyncNoChecksRec);

        } catch (Exception e) {
            System.out.println("Exception Publishing Asynchronously without Checks :" + e.getMessage());
        }

        /**************************************************************
         Publish Synchronously and check for results
         **************************************************************/
        //Create the record
        ProducerRecord<String, String> syncRec =
                new ProducerRecord<String, String>(
                        "kafka.learning.orders",    //Topic name
                        String.valueOf(randomKey.nextInt(1000)),
                        "This is order published synchronously"
                );

        //Send synchronously, wait for confirmation
        try {
            RecordMetadata retData =
                    (RecordMetadata) optionsProducer
                            .send(syncRec)
                            .get(); //Get makes it synchronous

            System.out.println("\nSent Synchronously :" + syncRec
                    + " Received Partition : " + retData.partition()
                    + " and Offset : " + retData.offset());

        } catch (Exception e) {
            System.out.println("Exception Publishing Synchronously:" + e.getMessage());
        }

        /**************************************************************
         Publish Asynchronously with a callback
         **************************************************************/

        String messageKey = String.valueOf(randomKey.nextInt(1000));
        //Create Message
        ProducerRecord<String, String> asyncRecCallBack =
                new ProducerRecord<String, String>(
                        "kafka.learning.orders",    //Topic name
                        messageKey,
                        "This is order published asynchronously with Callback"
                );

        //Send with Callback. Callback handler also has message key for context
        optionsProducer.send(asyncRecCallBack, new KafkaCallBack(messageKey));

        System.out.println("\nSent Asynchronously with Callback :" + asyncRecCallBack);

        optionsProducer.close();
    }
}
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

public class KafkaCallBack implements Callback {

    String messageKey;

    //Set message key to identify message. Additional information
    //can also be set here to provide context
    public KafkaCallBack(String messageKey) {
        super();
        this.messageKey=messageKey;
    }
    @Override
    public void onCompletion(RecordMetadata retData, Exception e) {

        //Check if exception occured
        if (e != null) {
            System.out.println("Exception Publishing Asynchronously without Callback :"
                    +"Message Key = " + messageKey + " : " + e.getMessage());
        }
        else {
            System.out.println(" Callback received for Message Key " + messageKey
                    + " returned Partition : " + retData.partition()
                    + " and Offset : " + retData.offset());
        }
    }
}
Clone this wiki locally