# <span style="color:#ff5f27"> 📝 Imports

In [None]:
import os
import datetime
import requests
import json
import pandas as pd

# <span style="color:#ff5f27"> ⚙️ Functions

In [None]:
def convert_date_to_unix(x):
    """
    Convert datetime to unix time in milliseconds.
    """
    dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S')
    dt_obj = int(dt_obj.timestamp() * 1000)
    return dt_obj

In [None]:
def get_weather_data(city_name: str,
                     coordinates: list,
                     start_date: str = None,
                     end_date: str = None,
                     forecast: bool = False):
    """
    Takes city name, coordinates and returns pandas DataFrame with weather data.
    
    'start_date' and 'end_date' are required if you parse historical observations data. (forecast=False)
    
    If forecast=True - returns 7 days forecast data by default (without specifying daterange).
    """
    
    latitude, longitude = coordinates
    
    params = {
        'latitude': latitude,
        'longitude': longitude,
        'hourly': ['temperature_2m','relativehumidity_2m','precipitation',
                   'weathercode','windspeed_10m','winddirection_10m'],
        'start_date': start_date,
        'end_date': end_date
    }
    
    if forecast:
        # historical forecast endpoint
        base_url = 'https://api.open-meteo.com/v1/forecast' 
    else:
        # historical observations endpoint
        base_url = 'https://archive-api.open-meteo.com/v1/archive?' 
        
    response = requests.get(base_url, params=params)
    response_json = response.json()

    some_metadata = {key: response_json[key] for key in ('latitude', 'longitude',
                                                         'timezone', 'hourly_units')}
    
    res_df = pd.DataFrame(response_json["hourly"])
    
    res_df["forecast_hr"] = 0
    
    if forecast:
        res_df["forecast_hr"] = res_df.index
    
    some_metadata["city_name"] = city_name
    res_df["city_name"] = city_name
    
    # rename columns
    res_df = res_df.rename(columns={
        "time": "base_time",
        "temperature_2m": "temperature",
        "relativehumidity_2m": "relative_humidity",
        "weathercode": "weather_code",
        "windspeed_10m": "wind_speed",
        "winddirection_10m": "wind_direction"
    })
    
    # change columns order
    res_df = res_df[["city_name", "base_time", "forecast_hr", "temperature", "precipitation",
                     "relative_humidity", "weather_code", "wind_speed", "wind_direction"]]
    
    # convert dates in 'base_time' column
    res_df["base_time"] = pd.to_datetime(res_df["base_time"])
    
    # create 'unix' columns
    res_df["unix_time"] = res_df["base_time"].apply(convert_date_to_unix)
    
    return res_df, some_metadata

# <span style="color:#ff5f27"> 🔮 Data Parsing

In [None]:
weather_df, metadata = get_weather_data("Paris", (48.85, 2.35),
                                        forecast=False, 
                                        start_date="2023-02-10", end_date="2023-02-12")

In [None]:
weather_df

In [None]:
weather_df, metadata = get_weather_data("Paris", (48.85, 2.35),
                                        forecast=True, 
                                        start_date="2023-02-10", end_date="2023-02-12")

In [None]:
weather_df

In [None]:
metadata

---
# <span style="color:#ff5f27"> 👩🏻‍🔬 Backfill Pipeline

In [None]:
today = datetime.date.today() # datetime object

day7next = str(today + datetime.timedelta(7))
day7ago = str(today - datetime.timedelta(7))
tomorrow = str(today + datetime.timedelta(1))

In [None]:
str(today)

In [None]:
with open('target_cities.json') as json_file:
    target_cities = json.load(json_file)

In [None]:
target_cities

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing historical weather observations from January 1 2000 till 7 days before today (thats the restrictions of this particular API.)

In [None]:
observations_df = pd.DataFrame()

for city_name in target_cities:
    weather_df_temp, metadata_temp = get_weather_data(city_name, target_cities[city_name],
                                                      start_date="2000-01-01", end_date=day7ago)
    observations_df = pd.concat([observations_df, weather_df_temp])

In [None]:
observations_df

In [None]:
# observations_df.to_csv("observations_df.csv", index=False)

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing historical weather forecasts from 7 days before today till today (process it as observations).

In [None]:
forecast_batch_df = pd.DataFrame()

for city_name in target_cities:
    weather_df_temp, metadata_temp = get_weather_data(city_name, target_cities[city_name],
                                                      start_date=day7ago, end_date=str(today),
                                                      forecast=True)
    forecast_batch_df = pd.concat([forecast_batch_df, weather_df_temp])

forecast_batch_df["forecast_hr"] = 0

In [None]:
forecast_batch_df

In [None]:
# forecast_batch_df.to_csv("forecast_batch_df.csv", index=False)

### <span style="color:#ff5f27"> 🧙🏼‍♂️ Parsing weather forecasts for 7 next days.

In [None]:
forecast_df = pd.DataFrame()

for city_name in target_cities:
    weather_df_temp, metadata_temp = get_weather_data(city_name, target_cities[city_name],
                                                      start_date=tomorrow, end_date=day7next,
                                                      forecast=True)
    forecast_df = pd.concat([forecast_df, weather_df_temp])

In [None]:
forecast_df

In [None]:
# forecast_df.to_csv("forecast_df.csv", index=False)

---
# <span style="color:#ff5f27"> ⬇️ Insert all data into Feature Store

In [None]:
import hopsworks


project = hopsworks.login(project='weather')
fs = project.get_feature_store() 

In [None]:
weather_fg = fs.get_or_create_feature_group(
    name='weather_data',
    description="Public Weather Data from 2000-01-01. Updates every day.",
    version=1,
    primary_key=["city_name", "unix_time", "forecast_hr"],
    # partition_key=["city_name"],
    event_time=["unix_time"],
    online_enabled=True
)

In [None]:
weather_fg.insert(observations_df, write_options={"wait_for_job": False})
weather_fg.insert(forecast_batch_df, write_options={"wait_for_job": False})
weather_fg.insert(forecast_df, write_options={"wait_for_job": True}) 
# we wait for the last one.

---
# <span style="color:#ff5f27"> 👨🏻‍🏫 Retrieve and check data consistency 

In [None]:
# weather_retrieved = weather_fg.read()
# weather_retrieved = weather_retrieved.sort_values("base_time")
# weather_retrieved

In [None]:
# # Create a datetime index object
# dt_index = pd.date_range(
#     start='2000-01-01',
#     end=str(today + datetime.timedelta(8)), # to include last, "seventh" day.
#     freq='H'
# )

In [None]:
# # Compare the length of the dataframe and datetime index
# if len(dt_index) - 1 != int(len(weather_retrieved) / len(city_names)): # we should compare dt_index to one city daterows.
#     print('Inconsistent dates in dataframe.')
# else:
#     print("Everything seems fine.")

In [None]:
# # I substract 1 from len(dt_index) cause it takes 00:00 hour from 8th day.
# dt_index

---