# Daily Pipeline

In [1]:
import hopsworks
import sys
from pathlib import Path
import warnings
from dotenv import load_dotenv
import os
from util import *
import datetime
from locations import resorts
from dateutil.relativedelta import relativedelta
warnings.filterwarnings("ignore", module="IPython")

In [25]:
project = hopsworks.login(
    host="eu-west.cloud.hopsworks.ai",             # DNS of your Hopsworks instance
    project="ID2223_Project"
)

fs = project.get_feature_store()

today = datetime.date.today().strftime('%Y-%m-%d')
warning_fg = fs.get_feature_group(
    name='avalanche_warning_with_lags',
    version=2
)
weather_fg = fs.get_feature_group(
    name="weather_terrain_sensor",
    version=2
)
terrain_fg = fs.get_feature_group(
    name="terrain_data",
    version=2
)

2026-01-01 13:00:36,955 INFO: Closing external client and cleaning up certificates.
2026-01-01 13:00:36,958 INFO: Connection closed.
2026-01-01 13:00:36,960 INFO: Initializing external client
2026-01-01 13:00:36,961 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2026-01-01 13:00:37,653 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/2173


## Warnings

In [17]:
warning_data_df = warning_fg.read()
warning_data_df

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.39s) 


Unnamed: 0,location,latitude,longitude,date,warning_level,warning_level_lag_1,warning_level_lag_2,warning_level_lag_3
0,Rauland Skisenter,59.819122,8.161512,2025-05-01 00:00:00+00:00,1,1.0,1.0,1.0
1,Voss Resort Fjellheisar,60.666798,6.416377,2025-06-06 00:00:00+00:00,0,0.0,0.0,0.0
2,Hovden Alpinsenter,59.595451,7.326028,2021-12-14 00:00:00+00:00,2,2.0,1.0,1.0
3,Nedre fjellheisstasjon Narvik,68.480972,17.404409,2024-06-29 00:00:00+00:00,0,0.0,0.0,0.0
4,Myrkdalen Fjellandsby,60.916092,6.531734,2022-05-01 00:00:00+00:00,1,1.0,1.0,1.0
...,...,...,...,...,...,...,...,...
21846,Sauda Ski Centre,59.653477,6.223121,2021-04-22 00:00:00+00:00,2,2.0,2.0,3.0
21847,Hemsedal Skisenter,60.910915,8.494637,2022-07-30 00:00:00+00:00,0,0.0,0.0,0.0
21848,Voss Resort Fjellheisar,60.666798,6.416377,2021-12-26 00:00:00+00:00,1,1.0,2.0,2.0
21849,Narvik Ski Resort,68.473755,17.429653,2023-11-18 00:00:00+00:00,0,0.0,0.0,0.0


In [24]:
rows=[]
for location, (lat, lon) in resorts.items():
    # print(f"Fetching {location}")
    warnings = get_warning_data(today, today, lat, lon)

    for w in warnings:
        rows.append({
            "location": location,
            "latitude": lat,
            "longitude": lon,
            "date": w.get("ValidFrom"),
            "warning_level": w.get("DangerLevel")
        })
    time.sleep(0.2)


In [26]:
warning_data_today_df = pd.DataFrame(rows)

def match_by_date(df, target_date):
    mask = df['date'].dt.date == target_date
    matched = df.loc[mask, 'warning_level']
    if not matched.empty:
        return matched.iloc[0]
    else:
        return float('nan') 

for loc in resorts.keys():
    warning_data_df_loc = warning_data_df[warning_data_df['location'] == loc]
    for lag in range(1, 4):
        day = datetime.datetime.strptime(today, '%Y-%m-%d') - relativedelta(days=lag)
        lag_value = match_by_date(warning_data_df_loc, day.date())
        warning_data_today_df.loc[warning_data_today_df['location'] == loc, f'warning_level_lag_{lag}'] = lag_value

warning_data_today_df.dropna(inplace=True)
warning_data_today_df['latitude'] = warning_data_today_df['latitude'].astype('float32')
warning_data_today_df['longitude'] = warning_data_today_df['longitude'].astype('float32')
warning_data_today_df['warning_level'] = warning_data_today_df['warning_level'].astype('int32')
warning_data_today_df['warning_level_lag_1'] = warning_data_today_df['warning_level_lag_1'].astype('int32')
warning_data_today_df['warning_level_lag_2'] = warning_data_today_df['warning_level_lag_2'].astype('int32')
warning_data_today_df['warning_level_lag_3'] = warning_data_today_df['warning_level_lag_3'].astype('int32')
warning_data_today_df['date'] = pd.to_datetime(warning_data_today_df['date'], format='%Y-%m-%dT%H:%M:%S').dt.normalize()
warning_data_today_df

warning_fg.insert(warning_data_today_df)

Uploading Dataframe: 100.00% |██████████| Rows 12/12 | Elapsed Time: 00:00 | Remaining Time: 00:00


Job started successfully, you can follow the progress at 


 None)

## Weather

In [7]:
dfs = []
for loc, (lat, lon) in resorts.items():
    hourly_df = get_hourly_weather_forecast(loc, lon, lat)
    hourly_df = hourly_df.set_index('date')
    daily_df = hourly_df.between_time('11:59', '12:01')
    daily_df = daily_df.reset_index()
    dfs.append(daily_df)

terrain_df = terrain_fg.read()
weather_daily_df = pd.concat(dfs, ignore_index=True)
# conver column date to 00:00 time
weather_daily_df['date'] = weather_daily_df['date'].dt.normalize()
weather_daily_df = add_terrain_weather_interactions(
    weather_daily_df,
    terrain_df
)
print(weather_daily_df.info())
weather_fg.insert(weather_daily_df, wait=True)


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.13s) 
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84 entries, 0 to 83
Data columns (total 13 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         84 non-null     datetime64[ns]
 1   temperature_2m_mean          84 non-null     float32       
 2   precipitation_sum            84 non-null     float32       
 3   rain_sum                     84 non-null     float32       
 4   snowfall_sum                 84 non-null     float32       
 5   wind_speed_10m_max           84 non-null     float32       
 6   wind_direction_10m_dominant  84 non-null     float32       
 7   location                     84 non-null     object        
 8   snow_load_steep              84 non-null     float64       
 9   wind_snow_transport          84 non-null     float64       
 10  rain_on_snow_risk         

Uploading Dataframe: 100.00% |██████████| Rows 84/84 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: weather_terrain_sensor_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/2173/jobs/named/weather_terrain_sensor_2_offline_fg_materialization/executions
2025-12-31 15:55:33,982 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-31 15:55:37,194 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-12-31 15:58:10,861 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-31 15:58:11,864 INFO: Waiting for log aggregation to finish.
2025-12-31 15:58:21,364 INFO: Execution finished successfully.


(Job('weather_terrain_sensor_2_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 3124
         }
       },
       "result": {
         "observed_value": 0.5091168880462646,
         "element_count": 84,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-12-31T03:55:20.000391Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expe