# Workshop №3

### Содержание тетрадки

1. [Функция получения данныx от API](#first)
2. [Функция создания топика для Kafka](#second)
3. [Функция для callback](#third)
4. [Запись данных в Kafka](#fourth)
    1. [Запись данных в топик без ключа](#fourthFirst)
    2. [Запись данных в топика с ключом](#fourthSecond)
5. [Чтение данных из Kafka](#fifth)
    1. [Чтение данных из топик без ключа](#fifthFirst)
    2. [Чтение данных из топика с ключом](#fifthSecond)
6. [Работа с Kafka в Avro формате](#sixth)
    1. [Запись данных в топик в формате avro без ключа](#sixthFirst)
    2. [Запись данных в топик в формате avro c ключом](#sixthSecond)
    3. [Чтение данных из топика в формате avro без ключа](#sixthThird)
    4. [Чтение данных из топика в формате avro  ключом](#sixthFourth)

Убедитесь, что у вас установлена библиотека [librdkafka](https://github.com/edenhill/librdkafka), так как она требуется для питонячей библиотеки **confluent_kafka**

Если у вас **mac**, то возможно он на найдет путей до этой библиотеки, поэтому придется устанавливать либу с указанием путей для librdkafka (версия librdkafka может отличаться)
```
'C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.9.1/include
LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.9.1/lib
pip install confluent_kafka[avro]'
```

In [None]:
import json
import logging as log
import time
from typing import Optional, Final

import requests
from confluent_kafka import avro
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.avro import AvroProducer, AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

---
## Функция получения данныx от API <a id="first"></a>
---

https://exchangerate.host/#/docs - ссылка на апишку

На предыдущем воркшопе мы анализировали апишку. Функция будет забирать данные за временной интервал.

In [None]:
def get_currency_rate_info(source_currency: str, target_currency: str,
                           start_date: Optional[str] = None,
                           end_date: Optional[str] = None) -> dict:
    """
    Get currency rate information about pair

    :param source_currency: source currency
    :param target_currency: target currency
    :param start_date: start date for uploading data. Default: None
    :param end_date: end date for uploading data. Default: None
    
    :return data: information about rate of pair
    """
    params: dict = {'base': source_currency, 'symbols': target_currency}
    url: str = f'https://api.exchangerate.host/timeseries?start_date={start_date}&end_date={end_date}'

    try:
        response = requests.get(url, params=params)
        data = response.json()
    except Exception as ex:
        print(f'Unable get currency rate info. Error: {ex}')
        raise ex

    return data

Заберем данные

In [None]:
data = get_currency_rate_info('USD', 'RUB', '2022-03-01', '2022-03-10')
data['rates']

---
## Функция создания топика для Kafka<a id="second"></a>
---

Для того чтобы записать данные в Kafka, нам нужно сначала создать топик в Kafka.

In [None]:
def create_kafka_topic(broker: str, topic: str, partitions: int, relicas: int) -> None:
    """
    Create topic in Kafka

    :param borker: Broker name
    :param topic: Topic name
    :param partitions: Number of partitions
    :param relicas: Number of replicas
    
    :return 
    """
    kafka_client = AdminClient({'bootstrap.servers': broker})
    new_topic = [NewTopic(topic, num_partitions=partitions, replication_factor=relicas)]
    
    try:
        kafka_client.create_topics(new_topic)
        print('Topic created')
    except Exception as ex:
        print(f'Failed to create topic. Error {ex}')

In [None]:
create_kafka_topic('0.0.0.0:9092', 'json_topic_with_value', 3, 1)

---
## Функция для callback <a id="third"></a>
---

Поскольку отправка идет асинхронная, то лучше вызывать функцию, которая будет сообщать нам о том, доставлено сообщение или нет.

In [None]:
def delivery_report(err, msg):
    """ 
    Called once for each message produced to indicate delivery result. Triggered by poll() or flush()
    :param err: Error info
    :param msg: Message info
    """
    if err is None:
        print(f'Message delivered to topic {msg.topic()} in partition [{msg.partition()}]')
    else:
        print(f'Message delivery failed: {err}')
        raise Exception('Error occurred while sending messages')

---
## Запись данных в Kafka<a id="fourth"></a>
---

Теперь запишем данные в топик. В топик данные можно по-разному записывать. Главная базовая единица в топике - это сообщение.
В сообщение можно отправить файл, а можно только кусочек файла. Кусочек файла - предпочтительнее, так как кафка как раз для этого и создана.

#### Запись данных в топик без ключа<a id="fourthFirst"></a> 

Попробуем писать вот такими порциями:
```
Value = ['2022-03-01': {'RUB': 108.495351}]
```

In [None]:
producer = Producer({'bootstrap.servers': '0.0.0.0:9092'})

for item in data['rates'].items():
    message = json.dumps(item).encode('utf-8')
    while True:
        try:
            producer.poll(0)
            producer.produce('json_topic_with_value', value=message, callback=delivery_report)
            break
        except BufferError as err:
            print(f'Kafka error: {err}. Waiting one second.')
            producer.poll(1)
            
producer.flush()
print('Data sent to Kafka')

#### Запись данных в топик с ключом<a id="fourthSecond"></a> 

В Kafka можно писать данные с ключом, чтобы можно было понять, к чему относится сообщение. В нашем случае, ключом будет выступать дата курса. 
```
Key = '2022-03-01', Value = {'RUB': 108.495351}
```

In [None]:
create_kafka_topic('0.0.0.0:9092', 'json_topic_with_key_value', 3, 1)

In [None]:
producer = Producer({'bootstrap.servers': '0.0.0.0:9092'})

for key, value in data['rates'].items():
    message = json.dumps(value).encode('utf-8')
    while True:
        try:
            producer.poll(0)
            producer.produce('json_topic_with_key_value', key=key, value=message, callback=delivery_report)
            break
        except BufferError as err:
            print(f'Kafka error: {err}. Waiting one second.')
            producer.poll(1)
            
producer.flush()
print('Data sent to Kafka')

---
##  Чтение данных из Kafka<a id="fifth"></a>
---

### Чтение данных из топика без ключа<a id="fifthFirst"></a> 

In [None]:
consumer = Consumer({'bootstrap.servers': '0.0.0.0:9092',
                     'group.id': 'practicumgroup',
                     'auto.offset.reset': 'earliest',
                     'enable.auto.commit': False})
consumer.subscribe(['json_topic_with_value'])

start_time = time.time()
seconds = 30

while True:
    current_time = time.time()
    elapsed_time = current_time - start_time
    if elapsed_time > seconds:
        break
        
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f'Consumer error: {msg.error()}')
        continue
    print(f'Received message: {msg.value().decode("utf-8")}')

consumer.close()

### Чтение данных из топика с ключом<a id="fifthSecond"></a> 

In [None]:
consumer = Consumer({'bootstrap.servers': '0.0.0.0:9092',
                     'group.id': 'practicumgroup',
                     'auto.offset.reset': 'earliest',
                     'enable.auto.commit': False})
consumer.subscribe(['json_topic_with_key_value'])

start_time = time.time()
seconds = 30

while True:
    current_time = time.time()
    elapsed_time = current_time - start_time
    if elapsed_time > seconds:
        break
        
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f'Consumer error: {msg.error()}')
        continue
    print(f'Received key: {msg.key().decode("utf-8")}. Received message: {msg.value().decode("utf-8")}')

consumer.close()

---
## Работа с Kafka в Avro формате<a id="sixth"></a> 
---

Воспользуемся автосозданием топика

###  Запись данных в топик в формате avro без ключа<a id="sixthFirst"></a> 

In [None]:
value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
value = {'name': 'Value'}

avro_producer = AvroProducer({
    'bootstrap.servers': '0.0.0.0:9092',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://0.0.0.0:8081'
    }, default_value_schema=value_schema)

avro_producer.produce(topic='avro_topic_with_value', value=value)
avro_producer.flush()

###  Запись данных в топик в формате avro c ключом<a id="sixthSecond"></a> 

In [None]:
value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {'name': 'Value'}
key = {'name': 'Key'}

avro_producer = AvroProducer({
    'bootstrap.servers': '0.0.0.0:9092',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://0.0.0.0:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avro_producer.produce(topic='avro_topic_with_key_and_value', value=value, key=key)
avro_producer.flush()

###  Чтение данных из топика в формате avro без ключа<a id="sixthThird"></a> 

In [None]:
avro_consumer = AvroConsumer({
    'bootstrap.servers': '0.0.0.0:9092',
    'group.id': 'practicumgroup',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'schema.registry.url': 'http://0.0.0.0:8081'})

avro_consumer.subscribe(['avro_topic_with_value'])

start_time = time.time()
seconds = 30

while True:
    current_time = time.time()
    elapsed_time = current_time - start_time
    if elapsed_time > seconds:
        break
    
    try:
        msg = avro_consumer.poll(1.0)
    except SerializerError as se:
        print(f'Message deserialization failed for {msg}: {se}')
        break

    if msg is None:
        continue

    if msg.error():
        print(f'AvroConsumer error: {msg.error()}')
        continue

    print(f'Received message: {msg.value()}')

avro_consumer.close()

###  Чтение данных из топика в формате avro c ключом<a id="sixthFourth"></a> 

In [None]:
avro_consumer = AvroConsumer({
    'bootstrap.servers': '0.0.0.0:9092',
    'group.id': 'practicumgroup',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'schema.registry.url': 'http://0.0.0.0:8081'})

avro_consumer.subscribe(['avro_topic_with_key_and_value'])

start_time = time.time()
seconds = 30

while True:
    current_time = time.time()
    elapsed_time = current_time - start_time
    if elapsed_time > seconds:
        break
    
    try:
        msg = avro_consumer.poll(1.0)
    except SerializerError as se:
        print(f'Message deserialization failed for {msg}: {se}')
        break

    if msg is None:
        continue

    if msg.error():
        print(f'AvroConsumer error: {msg.error()}')
        continue

    print(f'Received key: {msg.key()}. Received message: {msg.value()}')

avro_consumer.close()