In [17]:
%load_ext dotenv
%dotenv

import json
import socket
import pandas as pd
import numpy as np
from quixstreams import Application
from confluent_kafka import Producer

from etl.config import CurrentWeather, DatabaseEngine

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv


In [10]:
database = DatabaseEngine().get_engine()

In [11]:
weather_handler = CurrentWeather()
metadata = weather_handler.get_current_weather()

In [12]:
current = metadata.pop('current')
units = metadata.pop('current_units')
metadata.update(current)
metadata

{'latitude': 42.6875,
 'longitude': 23.3125,
 'generationtime_ms': 0.07295608520507812,
 'utc_offset_seconds': 7200,
 'timezone': 'Europe/Sofia',
 'timezone_abbreviation': 'EET',
 'elevation': 548.0,
 'time': '2024-11-13T05:15',
 'interval': 900,
 'temperature_2m': 4.1,
 'relative_humidity_2m': 88,
 'apparent_temperature': 1.8,
 'is_day': 0,
 'precipitation': 0.0,
 'rain': 0.0,
 'showers': 0.0,
 'snowfall': 0.0,
 'weather_code': 3,
 'cloud_cover': 100,
 'pressure_msl': 1027.9,
 'surface_pressure': 961.2,
 'wind_speed_10m': 3.1,
 'wind_direction_10m': 54,
 'wind_gusts_10m': 8.3}

In [13]:
units_df = pd.DataFrame([units])
units_df.to_sql('units', database, if_exists='replace', index=False)
database.dispose()

In [21]:
app = Application(
    broker_address='127.0.0.1:9092',
    loglevel='DEBUG'
)

temperature_topic = app.topic('current-weather', value_deserializer='json')

with app.get_producer() as producer:
    producer.produce(
        topic='current-weather',
        key='Sofia',
        value=json.dumps(metadata)
    )

[2024-11-13 05:51:33,103] [INFO] [quixstreams] : Topics required for this application: "current-weather"
[2024-11-13 05:51:33,210] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-11-13 05:51:33,253] [INFO] [quixstreams] : Kafka topics validation complete
%4|1731469893.254|TERMINATE|Currency-Multiplier.local#producer-8| [thrd:app]: Producer terminating with 1 message (561 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
[2024-11-13 05:51:33,261] [DEBUG] [quixstreams] : Flushing kafka producer
[2024-11-13 05:51:33,270] [DEBUG] [quixstreams] : Kafka producer flushed
