In [1]:
#TODO: add shell commands

In [2]:
#imports
import openmeteo_requests
import requests_cache
import pandas as pd
from retry_requests import retry
from kafka import KafkaProducer
import json

In [5]:
#set parameters
kafka_server = 'localhost:9092'
url = "https://archive-api.open-meteo.com/v1/archive"
params = {
	"latitude": 52.52,
	"longitude": 13.41,
	"hourly": ["temperature_2m", "relative_humidity_2m", "rain", "wind_speed_10m", "snowfall"],
	"start_date": "2021-09-05",
	"end_date": "2021-09-06"
}

In [6]:
#set up api
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

#get api data and parse into variables
responses = openmeteo.weather_api(url, params=params)
response = responses[0]
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_relative_humidity_2m = hourly.Variables(1).ValuesAsNumpy()
hourly_rain = hourly.Variables(2).ValuesAsNumpy()
hourly_wind_speed_10m = hourly.Variables(3).ValuesAsNumpy()
hourly_snowfall = hourly.Variables(4).ValuesAsNumpy()

hourly_data = {"time": pd.date_range(
	start = pd.to_datetime(hourly.Time(), unit = "s"),
	end = pd.to_datetime(hourly.TimeEnd(), unit = "s"),
	freq = pd.Timedelta(seconds = hourly.Interval()),
	inclusive = "left"
)}
hourly_data["temperature_2m"] = hourly_temperature_2m
hourly_data["relative_humidity_2m"] = hourly_relative_humidity_2m
hourly_data["rain"] = hourly_rain
hourly_data["wind_speed_10m"] = hourly_wind_speed_10m
hourly_data["snowfall"] = hourly_snowfall

hourly_dataframe = pd.DataFrame(data = hourly_data)
#coonvert timestamps into correct format
hourly_dataframe["time"]=hourly_dataframe["time"].dt.strftime('%d/%m/%y %H:%M:%S.%f')
print(hourly_dataframe)


                        time  temperature_2m  relative_humidity_2m  rain  \
0   05/09/21 00:00:00.000000       13.208500             94.588463   0.0   
1   05/09/21 01:00:00.000000       12.308500             95.178391   0.0   
2   05/09/21 02:00:00.000000       12.858500             92.416321   0.0   
3   05/09/21 03:00:00.000000       13.258500             86.527817   0.0   
4   05/09/21 04:00:00.000000       13.108500             81.748451   0.0   
5   05/09/21 05:00:00.000000       12.808500             77.169556   0.0   
6   05/09/21 06:00:00.000000       13.208500             74.673042   0.0   
7   05/09/21 07:00:00.000000       14.308500             71.906540   0.0   
8   05/09/21 08:00:00.000000       15.708500             70.033318   0.0   
9   05/09/21 09:00:00.000000       17.308500             68.057510   0.0   
10  05/09/21 10:00:00.000000       18.658501             65.260674   0.0   
11  05/09/21 11:00:00.000000       19.658501             62.952484   0.0   
12  05/09/21

In [4]:
#init producer
producer = KafkaProducer(bootstrap_servers=kafka_server,value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [5]:
#produce df to kafka row by row
for index, row in hourly_dataframe.iterrows():
    producer.send('weather', row.to_dict())
    producer.flush()

In [6]:
producer.close()