In [1]:
import csv
import logging
import datetime
import os
import sys
import pytz
import pandas as pd
from time import sleep
from typing import Dict
import yfinance as yf
from confluent_kafka import Consumer, KafkaError
from urllib.parse import urlparse
import json 
src_path = os.path.abspath('../')
if src_path not in sys.path:
    sys.path.append(src_path)
from src.utilities.utils import create_s3_keys_gas, check_s3_key_exists, generate_random_string

In [3]:
def consume_data():

    consumer_config = {
        'bootstrap.servers': Config.BOOTSTRAP_SERVERS_CONS,
        'group.id': Config.GROUP_ID_GAS,
        'auto.offset.reset': 'earliest'
    }

    consumer = Consumer(consumer_config)
    consumer.subscribe([Config.PRODUCE_TOPIC_GAS_PRICE])
    
    messages = []
    start_time = datetime.datetime.now()

    try:
       
        while datetime.datetime.now() - start_time < datetime.timedelta(seconds=60):
            
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    logger.info(f"Consumer error: {msg.error()}")
                    break
            
            message_value = msg.value().decode('utf-8')
            message_dict = json.loads(message_value)
            messages.append(message_dict)

    except KeyboardInterrupt:
        print("Consumer interrupted.")
    finally:
        consumer.close()
    data = pd.DataFrame(messages,columns=['date','open_price','close_price','key_id'])
    return data

In [4]:
def transform(data):

    data['date'] = pd.to_datetime(data['date'])
    data['open_price'] = data['open_price'].astype('float32')
    data['close_price'] = data['close_price'].astype('float32')
    data = data.drop(columns=['key_id']) 


    data['day'] = data['date'].dt.day
    data['month'] = data['date'].dt.month
    data['year'] = data['date'].dt.year
    data['hour'] = data['date'].dt.hour
    data['minute'] = data['date'].dt.minute
    data['month'] = data['month'].astype('int8')
    data['year'] = data['year'].astype('int16')
    data['hour'] = data['hour'].astype('int8')
    data['minute'] = data['minute'].astype('int8')
    data['day'] = data['day'].astype('int8')

    return data


In [5]:
data = consume_data()
transformed_data = transform(data)