Предлагаем вам два способа разворачивания кафки на вашем устройстве с докером и без. Если вы используете Windows, то рекомендуем воспользоваться докером
- Необходимо скачать apache kafka с официального сайта
- Рекомендую на диске С создать папку kafka_logs с папками для записи логов
- Откроем zookeeper.properties изменим параметр записи логов по созданному выше пути
- Аналогично откроем откроем server.properties изменим zookeeper.connection.timeout.ms=60000 и пропишем путь к лог файлу
- Скачаем Java Важно, чтобы в пути, по которому вы скачиваете Java не было пробелов, чтоюы избежать проблемы запуска в дальнейшем
- В вашем компьютере по пути: Система -> Дополнительные параметры системы -> Дополнительно -> Переменные среды Необходимо добавить в PATH путь к Java, а также переменную JAVA_HOME с путём к JAVA
- Создайте проект и убедитесь, что Java доступна
- Start Zookeeper: C:/kafka_2.12-3.2.0/bin/windows/zookeeper-server-start.bat C:/kafka_2.12-3.2.0/config/zookeeper.properties
- Start kafka: Start Kafka-server: C:/kafka_2.12-3.2.0/bin/windows/kafka-server-start.bat C:/kafka_2.12-3.2.0/config/server.properties
- Не забудьте изменить путь под вашу версию кафки. Готово!
- Скачайте докер и WSL2 (В Docker desktop будет ссылка)
- Рекомендую ознакомиться с документацией
- В корень проекта, в котором вы будете работать поместить конфигурационный файл
- docker-compose.yml делается похоже на ручной запуск из первого пункта. Вот моё готовое решение:
- В папке проекта напишите docker compose up -d
- В docker должны появиться созданные контейнеры
также можно проверить через docker ps
Перед началом работы ознакомьтесь с документацией
- Необходимо создать проект в pyCharm
- Вам понадобитмя библиотека для работы с кафкой pip install kafka-python
- Из библиотеки рекомендую использовать следующие методы from kafka import KafkaProducer, KafkaConsumer
- Пример функции отправки сообщения в кафку
def sendKafka():
my_producer = KafkaProducer(
bootstrap_servers=['localhost:29092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
message = "1"
while message != "0":
message = input("Type your message: ")
my_producer.send("testnum", value=message)
getKafka()
- Пример функции получения сообщения из кафки
def getKafka():
consumer = KafkaConsumer('testnum',
bootstrap_servers=['localhost:29092'],
group_id='test',
auto_offset_reset='earliest')
for msg in consumer:
res_str = msg.value.decode("utf-8")
print("Text:", res_str)
//В интернете можно найти более примитивные примеры