# Batch Daily Inference Pipeline
* Retrieve best model from Hopsworks
* Scrape weather forecasts for next 14 days
* Predict weather code and store predictions in Hopsworks

In [1]:
import hopsworks
import joblib
import numpy as np
import pandas as pd
from weather_utils import *

import sys
sys.path.append('..')  # Add the parent directory (project root) to the Python path
from config import *

# Disable annoying warnings
import warnings
warnings.filterwarnings("ignore")

# 1) Retrieve best model from Hopsworks

In [2]:
# Connect to Hopsworks
project = hopsworks.login()

# Retrieve Feature Store
fs = project.get_feature_store()

# Get Model Registry
mr = project.get_model_registry()

# Select best model based on evaluation metric
weather_code_model = mr.get_best_model(MODEL_NAME,
                          MODEL_METRIC,
                          OPTIMIZE_DIRECTION)

# Download model path
model_dir = weather_code_model.download()

# Download model
model = joblib.load(model_dir + '/'+ MODEL_NAME + '.pkl')

print("Model:", model_dir)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/178324
Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.
Downloading file ... Model: /var/folders/kf/md7wd1294hv6n5qvkc2149qr0000gn/T/a4b7bf41-d37d-4749-89f9-844b5c2419d8/weather_code_xgboost_model/1


# 2) Scrape weather forecasts for next 14 days

In [3]:
# Define query parameters
params = {
    "latitude": LATITUDE,
    "longitude": LONGITUDE,
    "daily": ["temperature_2m_min", "precipitation_sum", "wind_gusts_10m_max"],
    "timezone": TIMEZONE,
    "past_days": 0,
    "forecast_days": 14
}

# Setup connection with Open-Meteo
openmeteo = get_openmeteo_connection()

# Execute the query
responses = openmeteo.weather_api(BASELINE_URL_OPEN_METEO, params=params)

# Extract the location because the response can be done for multiple cities as well
response = responses[0]

# Process yesterday's data into a Pandas dataframe
df_forecasts = process_forecast_request(response)

print(df_forecasts)

          date  temperature_min  precipitation_sum  wind_gusts_max
0   2023-11-13           1.4175           0.600000       27.000000
1   2023-11-14           0.1675           0.000000       27.719999
2   2023-11-15          -0.3325           0.100000       25.559999
3   2023-11-16          -0.4805           0.600000       23.039999
4   2023-11-17          -2.0805           0.000000       17.280001
5   2023-11-18          -2.2305           0.000000       18.719999
6   2023-11-19          -2.9455           0.000000       29.879999
7   2023-11-20          -4.5955           0.000000       37.079998
8   2023-11-21          -2.7415           1.600000       41.039997
9   2023-11-22          -1.3415           0.900000       39.239998
10  2023-11-23          -3.4415           3.099999       46.079998
11  2023-11-24          -5.3915           0.400000       47.880001
12  2023-11-25          -6.0915           0.000000       39.959999
13  2023-11-26          -3.9915           0.000000       55.79

# 3) Predict Weather Code

In [4]:
# Add a new column with the month as an integer
df_forecasts['month'] = pd.to_datetime(df_forecasts['date']).dt.month

# Select features for model
X = df_forecasts.drop(columns=["date"])
print(X.columns)

Index(['temperature_min', 'precipitation_sum', 'wind_gusts_max', 'month'], dtype='object')


In [5]:
# Train model on the training set
y = model.predict(X)

# Round predicted value to closest weather code
y = np.round(y).astype(int)

df_forecasts['weather_code_prediction'] = y

In [6]:
# Add label of weather code
df_codes_mapping = pd.read_csv("../resources/weather_code_mapping.csv")

# Merge DataFrames on the 'weather_code' column
df_forecasts = pd.merge(df_forecasts, df_codes_mapping, left_on='weather_code_prediction', right_on='weather_code', how='left')

# Drop the redundant 'weather_code' column if needed
df_forecasts = df_forecasts.drop(['weather_code', 'weather_code_wmo'], axis=1)

In [7]:
print(df_forecasts)

          date  temperature_min  precipitation_sum  wind_gusts_max  month  \
0   2023-11-13           1.4175           0.600000       27.000000     11   
1   2023-11-14           0.1675           0.000000       27.719999     11   
2   2023-11-15          -0.3325           0.100000       25.559999     11   
3   2023-11-16          -0.4805           0.600000       23.039999     11   
4   2023-11-17          -2.0805           0.000000       17.280001     11   
5   2023-11-18          -2.2305           0.000000       18.719999     11   
6   2023-11-19          -2.9455           0.000000       29.879999     11   
7   2023-11-20          -4.5955           0.000000       37.079998     11   
8   2023-11-21          -2.7415           1.600000       41.039997     11   
9   2023-11-22          -1.3415           0.900000       39.239998     11   
10  2023-11-23          -3.4415           3.099999       46.079998     11   
11  2023-11-24          -5.3915           0.400000       47.880001     11   

# 4) Update forecasts in Hopsworks Feature Group

In [9]:
# Get or create Feature Group
weather_code_predictions_fg = fs.get_or_create_feature_group(
    name=FEATURE_GROUP_FORECAST,
    version=1,
    primary_key=["date"],
    event_time=["date"],
    description="Weather code forecast")

# Insert data
weather_code_predictions_fg.insert(df_forecasts, write_options={"wait_for_job" : True})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/178324/fs/178243/fg/219529


Uploading Dataframe: 0.00% |          | Rows 0/14 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_forecast_fg_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/178324/jobs/named/weather_forecast_fg_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x13f2f31d0>, None)

In [10]:
# save forecast locally
df_forecasts.to_csv('../resources/forecast.csv')

In [11]:
# upload forecast to Hopsworks cluster for Hugging Face
dataset_api = project.get_dataset_api()
dataset_api.upload("../resources/forecast.csv", "Resources/weather_forecast", overwrite=True)

Uploading: 0.000%|          | 0/1419 elapsed<00:00 remaining<?

'Resources/weather_forecast/forecast.csv'