In [2]:
from kafka import KafkaProducer

In [4]:
# Create a producer
producer = KafkaProducer(
    bootstrap_servers = ['localhost:9092'],
    key_serializer = str.encode,
    value_serializer = str.encode
)

In [5]:
for i in range(10):
    key = f'key-{i}'
    value = f'value-{i}'
    producer.send('my-first-topic', key=key, value=value)

print(f'Messages sent to topic my-first-topic')

Messages sent to topic my-first-topic


In [None]:
producer.flush() # Ensure all messages are sent

## Methods of sending messages in Kafka Producer

**Fire-and-Forget**: The producer sends a message and does not wait for any acknowledgment or response from the broker.

In [None]:
# Create a Kafka producer instance
producer = KafkaProducer(
    bootstrap_servers = ['localhost:9092'],
    key_serializer = str.encode,
    value_serializer = str.encode
)

# Send a message to a Kafka topic
producer.send(
    topic = "my_topic",
    key="my_key",
    value="Hello, Kafka!"
)

# Ensure all messages are sent before exiting
producer.flush()


UsageError: Line magic function `%md` not found.


**Synchronous Send**: The producer sends a message and waits for the acknowledgment from the broker before sending the next message.

In [None]:
from kafka.errors import KafkaError

# future is a Kafka Future object that will hold the result of the send operation
future = producer.send(
    topic='my-first-topic',
    key = "key-sync",
    value="Srujan"
)

# checking the future result for success or failure of the send operation
try:
    # checking the result of the send operation every 10 seconds
    record_metadata = future.get(timeout=10)
    print(f"Topic:{record_metadata.topic}, Partition:{record_metadata.partition}, Offset:{record_metadata.offset}")
except KafkaError as e:
    print(f'Error sending message: {e}')

Topic:my-first-topic, Partition:0, Offset:12


**Asynchronous Send**: The producer sends a message and immediately continues without waiting for the acknowledgment.

In [10]:
from kafka.errors import KafkaError

# success and error callback functions
def on_send_success(record_metadata):
    print(f"Topic:{record_metadata.topic}, Partition:{record_metadata.partition}, Offset:{record_metadata.offset}")

def on_send_error(err):
    print(f"Error sending message: {err}")

# Send message asynchronously with callbacks
producer.send(
    topic='my-first-topic',
    key = "key-sync",
    value="Srujan"
).add_callback(on_send_success).add_errback(on_send_error)

<kafka.producer.future.FutureRecordMetadata at 0x17178517e80>

Topic:my-first-topic, Partition:0, Offset:13
