## Домашнє завдання до теми «Apache Kafka»

### 1. Створення топіків в Kafka

In [23]:
from kafka.admin import KafkaAdminClient, NewTopic
from configs import kafka_config
from kafka import KafkaProducer, KafkaConsumer
import json
import uuid
import time
import random

In [24]:
# Створення клієнта Kafka
admin_client = KafkaAdminClient(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password']
)

# Визначення нового топіку
topic_names = [
    f'{kafka_config['name']}_building_sensors',
    f'{kafka_config['name']}_temperature_alerts',
    f'{kafka_config['name']}_humidity_alerts',
]

num_partitions = 2
replication_factor = 1
new_topics = [ NewTopic(name=n, num_partitions=num_partitions, replication_factor=replication_factor) for n in topic_names ]

# Створення нових топіків
try:
    admin_client.create_topics(new_topics=new_topics, validate_only=False)
    print(f"Topics are created successfully.")
    [print(topic) for topic in admin_client.list_topics() if kafka_config['name'] in topic]
except Exception as e:
    print(f"An error occurred: {e}")

# Закриття зв'язку з клієнтом
admin_client.close()

An error occurred: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='maksymp_building_sensors', num_partitions=2, replication_factor=1, replica_assignment=[], configs=[]), (topic='maksymp_temperature_alerts', num_partitions=2, replication_factor=1, replica_assignment=[], configs=[]), (topic='maksymp_humidity_alerts', num_partitions=2, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='maksymp_building_sensors', error_code=36, error_message="Topic 'maksymp_building_sensors' already exists."), (topic='maksymp_temperature_alerts', error_code=36, error_message="Topic 'maksymp_temperature_alerts' already exists."), (topic='maksymp_humidity_alerts', error_code=36, error_message="Topic 'maksymp_humidity_alerts' already exists.")])'.


### 2. Відправка даних до топіків

In [None]:
# Створення Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Назва топіку
topic_name = f'{kafka_config['name']}_building_sensors'

sensor_id = str(uuid.uuid4())

try:
    for i in range(3000):
    # Відправлення повідомлення в топік

        data = {
            "sensor_id": sensor_id,                 # Ідентифікатор датчика
            "timestamp": time.time(),               # Часова мітка
            "temperature": random.randint(25, 45),  # Випадкове значення температури
            "humidity": random.randint(15, 85),     # Випадкове значення вологості
        }
        producer.send(topic_name, key=str(uuid.uuid4()), value=data)
        producer.flush()  # Очікування, поки всі повідомлення будуть відправлені
        print(f"SND >> {topic_name}: {data}")
        time.sleep(2)

except Exception as e:
    print(f"An error occurred: {e}")
except KeyboardInterrupt as e:
    print(f'{e}. Exiting...')
finally:
    producer.close()  # Закриття producer

SND >> maksymp_building_sensors: {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807413.386982, 'temperature': 42, 'humidity': 72}
SND >> maksymp_building_sensors: {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807415.712957, 'temperature': 42, 'humidity': 42}
SND >> maksymp_building_sensors: {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807417.774233, 'temperature': 41, 'humidity': 72}
. Exiting...


### 3. Обробка даних

In [17]:
# Створення Kafka Consumer
consumer = KafkaConsumer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',  # Зчитування повідомлень з початку
    enable_auto_commit=True,       # Автоматичне підтвердження зчитаних повідомлень
    group_id=f'{kafka_config["name"]}_group_1'   # Ідентифікатор групи споживачів
)

producer = KafkaProducer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Назва топіку
sensors_topic_name = f'{kafka_config['name']}_building_sensors'
temperature_alerts_topic_name = f'{kafka_config['name']}_temperature_alerts'
humidity_alerts_topic_name = f'{kafka_config['name']}_humidity_alerts'

# Підписка на тему
consumer.subscribe([sensors_topic_name])

print(f'Subscribed to topic "{sensors_topic_name}"')

# Обробка повідомлень з топіку
try:
    for message in consumer:
        print(f'RCV << {message.value}')
        if message.value['temperature'] > 40:
            message.value['message'] = 'High temperature alert!'
            producer.send(temperature_alerts_topic_name, message.value)
            producer.flush()
            print(f'SND >> {temperature_alerts_topic_name}: "{message.value}"')
        if message.value['humidity'] > 80:
            message.value['message'] = 'High humidity alert!'
        elif message.value['humidity'] < 20:
            message.value['message'] = 'Low humidity alert!'
        else:
            continue
        producer.send(humidity_alerts_topic_name, message.value)
        producer.flush()
        print(f'SND >> {humidity_alerts_topic_name}: "{message.value}"')

except Exception as e:
    print(f'An error occurred: {e}')
except KeyboardInterrupt as e:
    print(f'{e}. Exiting...')
finally:
    consumer.close()  # Закриття consumer
    producer.close()  # Закриття producer


Subscribed to topic "maksymp_building_sensors"
RCV << {'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807399.1792421, 'temperature': 42, 'humidity': 27}
SND >> maksymp_temperature_alerts: "{'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807399.1792421, 'temperature': 42, 'humidity': 27, 'message': 'High temperature alert!'}"
RCV << {'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807401.6357424, 'temperature': 27, 'humidity': 43}
RCV << {'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807403.2469268, 'temperature': 39, 'humidity': 41}
RCV << {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807413.386982, 'temperature': 42, 'humidity': 72}
SND >> maksymp_temperature_alerts: "{'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807413.386982, 'temperature': 42, 'humidity': 72, 'message': 'High temperature alert!'}"
RCV << {'sensor_id': '3e4aa547-010f-4a98-8b70

### 4. Остаточні дані

In [18]:
# Створення Kafka Consumer
consumer = KafkaConsumer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    key_deserializer=lambda v: json.loads(v.decode('utf-8')),
    auto_offset_reset='earliest',  # Зчитування повідомлень з початку
    enable_auto_commit=True,       # Автоматичне підтвердження зчитаних повідомлень
    group_id=f'{kafka_config["name"]}_group_1'   # Ідентифікатор групи споживачів
)

# Назва топіку
temperature_alerts_topic_name = f'{kafka_config['name']}_temperature_alerts'
humidity_alerts_topic_name = f'{kafka_config['name']}_humidity_alerts'

# Підписка на тему
consumer.subscribe([temperature_alerts_topic_name, humidity_alerts_topic_name])

print(f'Subscribed to topics "{[temperature_alerts_topic_name, humidity_alerts_topic_name]}"')

# Обробка повідомлень з топіку
try:
    for message in consumer:
        print(f'RCV << {message.value}')

except Exception as e:
    print(f'An error occurred: {e}')
except KeyboardInterrupt as e:
    print(f'{e}. Exiting...')
finally:
    consumer.close()  # Закриття consumer

Subscribed to topics "['maksymp_temperature_alerts', 'maksymp_humidity_alerts']"
RCV << {'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807399.1792421, 'temperature': 42, 'humidity': 27, 'message': 'High temperature alert!'}
RCV << {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807413.386982, 'temperature': 42, 'humidity': 72, 'message': 'High temperature alert!'}
RCV << {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807415.712957, 'temperature': 42, 'humidity': 42, 'message': 'High temperature alert!'}
RCV << {'sensor_id': '3e4aa547-010f-4a98-8b70-5a1b52460eb3', 'timestamp': 1737807417.774233, 'temperature': 41, 'humidity': 72, 'message': 'High temperature alert!'}
RCV << {'sensor_id': '85cea06e-9c34-45be-a242-aecd44fdf8ac', 'timestamp': 1737807403.6952028, 'temperature': 29, 'humidity': 16, 'message': 'Low humidity alert!'}
. Exiting...
