Убедитесь, что консьюмер может получать данные об объёмах торгов бирж от продьюсера.

Реализуйте функцию, которая:
- принимает на вход данные об объёмах торгов одной биржи за один день;
- агрегирует (np.mean) данные до окна в 4 часа (то есть будет оставаться 24 / 4 = 6 значений объёмов торгов за один день для каждой биржи);
- сохраняет промежуточный результат (например, в list или pd.DataFrame) и для каждых пяти бирж делает сохранение в csv-файл с колонками [exchange, volume_1, volume_2, …, volume_6];
- если файл уже существует и в нём записаны первые пять/десять (и так далее) бирж, то на каждой пятой бирже нужно прочитать существующий файл, дописать в него новые данные и сохранить.

In [146]:
import json
import os
import pandas as pd
import numpy as np

from confluent_kafka import Consumer, KafkaError

In [147]:
def aggregate_volume(volumes, exchange, filename):
    # Определяем колонки для сохранения в DataFrame
    columns = ['exchange', 'volume_1', 'volume_2', 'volume_3', 'volume_4', 'volume_5', 'volume_6']

    # Разбиваем список объёмов торгов на куски по 6 элементов (элементов всего - 144/24=6)
    chunks = [[float(volume) for volume in volumes[i:i+24]] for i in range(0, len(volumes), 24)]

    # Считаем среднее значение для каждого куска и добавляем в DataFrame с результатами
    result = [np.mean(chunk) for chunk in chunks]
    new_entry = pd.DataFrame({'exchange': [exchange], 'volume_1': [result[0]], 'volume_2': [result[1]], 
                              'volume_3': [result[2]], 'volume_4': [result[3]], 'volume_5': [result[4]], 
                              'volume_6': [result[5]]})

    # Если файл с промежуточными результатами уже существует, то считываем его в DataFrame
    if os.path.exists(filename):
        df = pd.read_csv(filename)
    else:
        df = pd.DataFrame(columns=columns)

    # Добавляем новые данные в DataFrame
    df = pd.concat([df, new_entry], ignore_index=True)

    # Если в DataFrame накопилось информации о 5 биржах, сохраняем результаты в csv-файл
    if len(df) % 5 == 0:
        directory = os.path.dirname(filename)
        os.makedirs(directory, exist_ok=True)
        df.to_csv(filename, index=False)

    # Сохраняем DataFrame в файл
    df.to_csv(filename, index=False)

    # Возвращаем DataFrame
    return df


In [148]:
filename = f'{os.getcwd()}/data/intermediate_results.csv'
topic = 'quickstart'

config = {'bootstrap.servers': 'localhost:9092', 
          'group.id': 'test_consumer', 
          'auto.offset.reset': 'earliest'}

consumer = Consumer(config)

consumer.subscribe([topic])

while True:
    msg = consumer.poll(1.0)  # timeout = 1 секунда на получение сообщения; возвращает None, если не получили сообщения

    if msg is None:
        continue
    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            print('Достигнут конец раздела')
        else:
            print('Ошибка при чтении сообщения:', msg.error())
        
    exchange_data = json.loads(msg.value().decode('utf-8'))
    exchange_name = list(exchange_data.keys())[0]
    volume_one_day_data = exchange_data[exchange_name]
    intermediate_results = aggregate_volume(volume_one_day_data, exchange_name, filename)
    print(f'Объем торгов на бирже {exchange_name} за последний день: {volume_one_day_data}')
    print(f'Промежуточные результаты: {intermediate_results.to_dict()}')
    
consumer.close()

Объем торгов на бирже Binance за последний день: ['381371.5256153836744621', '381588.0941562320071179', '381769.4965686631749211', '382507.1391076817745779', '382586.7601734836043354', '383006.7227926359726845', '382973.1434852977743063', '384812.1331278452522991', '385612.0192465523819181', '385911.4873048836747379', '387174.5331759709658358', '387458.7696290304609097', '389615.4504010604315823', '390089.5146597857066432', '389300.225988137325007', '389407.9682120785771041', '389288.1679087161722308', '389547.6356374934782458', '390433.5396207275157182', '391970.8672182793827032', '392139.9444381998148005', '393278.1060992473405927', '394772.7764604872594321', '397226.0709018695222505', '400110.168366053280069', '401594.5433740831819306', '406498.6391607113403903', '408224.6755974639201282', '408840.6051583778653195', '413122.8043250823673299', '413623.1918653490600783', '416521.400163913125658', '420419.8828309597107277', '422565.9668509154911651', '430793.0281128709372961', '433620.

KeyboardInterrupt: 