# Практическая работа Kafka
## Цель
* Освоить написание продьюсера
* Узнать, как различные параметры настройки влияют на работу с Kafka

Для начала немного подготовки.
Не забудьте выбрать ядро, выполните следующий шаг, с помощью которого мы установим и импортируем необходимые для работы библиотеки

In [43]:
!pip3 install kafka-python

import json
import random
from datetime import datetime
from time import sleep
from kafka import KafkaProducer



Ниже объект с конфигурацией консьюемера. В интерфейсе вы уже создали 3 топика, дальше мы напишем несколько продьюсеров, которые будут писать в эти топики.
Не забудьте выполнить ячейку с конфигурацией

In [44]:
base_config = {
    # Список хостов и портов Kafka
    "bootstrap_servers": ["localhost:9093", "localhost:9095", "localhost:9097"],
    # Тайм-аут подключения к брокеру (в миллисекундах)
    "request_timeout_ms": 20000,
    # Уровень подтверждений от брокера:
    # '0' — не требуется подтверждений,
    # '1' — подтверждение от одного брокера,
    # 'all' — подтверждение от всех реплик.
    "acks": "all",
    # Кодировка ключа и значения сообщений
    "key_serializer": lambda key: json.dumps(key).encode('utf-8'),
    "value_serializer": lambda value: json.dumps(value).encode('utf-8')
}

А в следующей ячейке напишем метод отправки сообщения продьюсером

In [51]:
def send_message(producer, topic, message, headers=None, partition=None):
    """
    Отправляет сообщение в указанный топик.
    :param producer: Созданный нами продьюсерd
    :param topic: Название Kafka топика
    :param key: Ключ сообщения
    :param headers: Заголовки сообщения
    :param value: Значение сообщения
    """
    try:
        future = producer.send(topic, key=message["key"], value=message, partition=partition, headers=headers)
        # Опционально: ожидание завершения отправки
        record_metadata = future.get(timeout=10)
        print(f"Сообщение отправлено в топик {record_metadata.topic}, "
              f"раздел {record_metadata.partition}, смещение {record_metadata.offset}, id={message['key']}")
    except Exception as e:
        print(f"Ошибка при отправке сообщения: {e}")

А тут опишем декодирование заголовков, чтобы они корректно отправились

In [46]:
def convert_headers(headers):
    return [(key, str(value).encode('utf-8')) for key, value in headers.items()]


Сейчас надо не забыть создать топики, как описано в [инструкции](./kafka-ui.md). Я создал топик с 3 партициями и названием new-topic. Впишите ваше название в переменную TEST_TOPIC в ячейке ниже.  
Ниже описано сообщение, которое мы отправим. Сделаем это 10 раз (для этого нужен цикл) и выставим задержку в 2 секунды между сообщениями

In [47]:
from kafka import KafkaProducer

# Инициализация Kafka Producer
producer = KafkaProducer(**base_config)

На какое-то время предлагаю переместиться и запустить наш первый консьюмер. Инструкции [тут](./kafka-consumer.ipynb)

In [48]:
TEST_TOPIC = 'new-topic' # Вместо new-topic поставьте название своего топика
message_example = {
    "text": "MY_FIRST_MESSAGE",
    "number_value": 14.223
}

for i in range(10):
    message_example["key"] = str(i)
    send_message(producer, TEST_TOPIC, message_example)
    sleep(2)

Сообщение отправлено в топик new-topic, раздел 2, смещение 54
Сообщение отправлено в топик new-topic, раздел 1, смещение 99
Сообщение отправлено в топик new-topic, раздел 0, смещение 98
Сообщение отправлено в топик new-topic, раздел 2, смещение 55
Сообщение отправлено в топик new-topic, раздел 1, смещение 100
Сообщение отправлено в топик new-topic, раздел 0, смещение 99
Сообщение отправлено в топик new-topic, раздел 1, смещение 101
Сообщение отправлено в топик new-topic, раздел 0, смещение 100
Сообщение отправлено в топик new-topic, раздел 0, смещение 101
Сообщение отправлено в топик new-topic, раздел 1, смещение 102


А теперь попробуем поиграться настройками linger.ms и max.request_size

In [49]:
linger_config = {
    # Время ожидания перед отправкой
    "linger_ms": 5000,
    # Максимальный размер отправляемого сообщения (в байтах)
    "max_request_size": 1024
}

new_config = base_config | linger_config

# Инициализация Kafka Producer
producer = KafkaProducer(** new_config)

# Отправляем сообщения тем же способом
for i in range(30):
    message_example["key"] = str(i)
    producer.send(TEST_TOPIC, message_example, message_example["key"])
    # send_message(producer, TEST_TOPIC, message_example)
    sleep(0.5)

Вечный продсьюер. Запускаем след-ие ячейки. Если захотите перестать отправлять сообщения, то можете остановить выполнение

In [55]:
i = 0

In [56]:
ONE_PARTITION_TOPIC = 'one-partition'
print(base_config)
infinite_producer = KafkaProducer(**base_config)

id = 0
while True:
    message_example["key"] = str(i)
    send_message(infinite_producer, ONE_PARTITION_TOPIC, message_example)
    i += 1
    sleep(0.5)

{'bootstrap_servers': ['localhost:9093', 'localhost:9095', 'localhost:9097'], 'request_timeout_ms': 20000, 'acks': 'all', 'key_serializer': <function <lambda> at 0x12180c9d0>, 'value_serializer': <function <lambda> at 0x121a83a60>}
Сообщение отправлено в топик one-partition, раздел 0, смещение 99, id=0
Сообщение отправлено в топик one-partition, раздел 0, смещение 100, id=1
Сообщение отправлено в топик one-partition, раздел 0, смещение 101, id=2
Сообщение отправлено в топик one-partition, раздел 0, смещение 102, id=3
Сообщение отправлено в топик one-partition, раздел 0, смещение 103, id=4
Сообщение отправлено в топик one-partition, раздел 0, смещение 104, id=5
Сообщение отправлено в топик one-partition, раздел 0, смещение 105, id=6
Сообщение отправлено в топик one-partition, раздел 0, смещение 106, id=7
Сообщение отправлено в топик one-partition, раздел 0, смещение 107, id=8
Сообщение отправлено в топик one-partition, раздел 0, смещение 108, id=9
Сообщение отправлено в топик one-partit

KeyboardInterrupt: 