# 1. Начало работы с Kafka

Тест подключения к Kafka, проверка возможности отправлять и принимать сообщения

In [None]:
!pip install kafka-python

In [None]:
import json
import time
import random

In [None]:
import hashlib
import uuid

In [None]:
KAFKA_BOOTSTRAP_SERVERS = ['localhost:19092', 'localhost:29092', 'localhost:39092']
TOPIC_NAME = "orders"

In [None]:
from kafka import KafkaProducer, KafkaAdminClient

try:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        request_timeout_ms=5000,
        retries=3,
        acks='all',
        batch_size=16384,
        linger_ms=10
    )
    print("Подключение успешно!")

    i = 0
    while i <= 30:
        future = producer.send(
            TOPIC_NAME,
            key=str(uuid.uuid4()).encode('utf-8'),
            value=json.dumps({"message": f"order - id {i}", "processed_at":time.time()}).encode("utf-8"),
        )
        
        try:
            record_metadata = future.get(timeout=10)
            print(f"Сообщение {i} отправлено в partition {record_metadata.partition}, offset {record_metadata.offset}")
        except Exception as e:
            print(f"Ошибка отправки сообщения {i}: {e}")

        i += 1
        time.sleep(random.randint(1, 5))
        
    producer.flush()
    producer.close()
except Exception as e:
    print(f"Ошибка: {e}")


In [None]:
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    auto_offset_reset="earliest",
    enable_auto_commit=True,
)

print("Успешное подключение к Kafka!")

for message in consumer:
    print(message.value.decode("utf-8"))

consumer.close()

Почему-то партиция создалась только одна, хотя значение по дефолту было 8. Хорошая возможность попробовать самостоятельно увеличить кол-во партиций. 

Сначала подключимся к админке и посмотрим, что с топиком

In [None]:
from kafka import KafkaAdminClient

KAFKA_BOOTSTRAP_SERVERS = ['localhost:19092', 'localhost:29092', 'localhost:39092']
TOPIC_NAME = "orders"

In [None]:
admin = KafkaAdminClient(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    request_timeout_ms=5000
)
topics = admin.list_topics()
    

In [None]:
print(topics)

Есть только один топик, его и нужно проверить

In [None]:
topic_metadata = admin.describe_topics(topics=['orders'])
print(topic_metadata)

Да, действительно одна партиция, значит, можно партиции создать

In [None]:
from kafka.admin import NewPartitions

topic_partitions = {
    'orders': NewPartitions(8)
}
response = admin.create_partitions(topic_partitions)

In [None]:
print(response)

In [None]:
topic_metadata = admin.describe_topics(topics=['orders'])
print(topic_metadata)

[{'error_code': 0, 'topic': 'orders', 'is_internal': False, 'partitions': [{'error_code': 0, 'partition': 1, 'leader': 2, 'leader_epoch': 0, 'replicas': [2, 3, 1], 'isr': [2, 3, 1], 'offline_replicas': []}, {'error_code': 0, 'partition': 3, 'leader': 1, 'leader_epoch': 0, 'replicas': [1, 2, 3], 'isr': [1, 2, 3], 'offline_replicas': []}, {'error_code': 0, 'partition': 2, 'leader': 3, 'leader_epoch': 0, 'replicas': [3, 1, 2], 'isr': [3, 1, 2], 'offline_replicas': []}, {'error_code': 0, 'partition': 0, 'leader': 1, 'leader_epoch': 0, 'replicas': [1, 2, 3], 'isr': [1, 2, 3], 'offline_replicas': []}, {'error_code': 0, 'partition': 5, 'leader': 1, 'leader_epoch': 0, 'replicas': [1, 2, 3], 'isr': [1, 2, 3], 'offline_replicas': []}, {'error_code': 0, 'partition': 7, 'leader': 3, 'leader_epoch': 0, 'replicas': [3, 2, 1], 'isr': [3, 2, 1], 'offline_replicas': []}, {'error_code': 0, 'partition': 6, 'leader': 2, 'leader_epoch': 0, 'replicas': [2, 3, 1], 'isr': [2, 3, 1], 'offline_replicas': []}, {'error_code': 0, 'partition': 4, 'leader': 3, 'leader_epoch': 0, 'replicas': [3, 1, 2], 'isr': [3, 1, 2], 'offline_replicas': []}], 'authorized_operations': ['READ', 'WRITE', 'CREATE', 'DELETE', 'ALTER', 'DESCRIBE', 'DESCRIBE_CONFIGS', 'ALTER_CONFIGS']}]

In [None]:
for topic_info in topic_metadata:
    if topic_info['error_code'] == 0:  # Успешный запрос
        partition_count = len(topic_info['partitions'])
        print(f"Топик '{topic_info['topic']}' имеет {partition_count} партиций")

Для запуска консьюмера из командной строки

```bash
docker exec -it kafka-1 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic orders --property "print.key=true"
```

Для открытия системных файлов, хранимых в бинарном формате

```bash
 docker exec kafka-1 /opt/kafka/bin/kafka-dump-log.sh --files /var/lib/kafka/data/orders-0/00000000000000000000.log  --print-data-log
 ```