## Install package requirements and import dependencies

In [131]:
!pip install -r requirements.txt --quiet

import openmeteo_requests
import numpy as np
from dotenv import load_dotenv
import pandas as pd
import requests_cache
import subprocess
from retry_requests import retry
from io import StringIO
import hopsworks
import great_expectations as ge
from datetime import date
import json
import time
import statistics
import time
import copy


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


#### Enable debug mode?

In [170]:
debug = False

## Load environment variables from the .env file

In [3]:
load_dotenv()

True

## Connect to hopsworks

In [4]:
project = hopsworks.login()

2025-12-28 08:25:54,377 INFO: Initializing external client
2025-12-28 08:25:54,378 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2025-12-28 08:25:56,838 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271967


In [5]:
fs = project.get_feature_store()

### Define expectation suites for ski weather

In [40]:
min_year = 1940
max_year = date.today().year - 1
# the columns for the pandas dataframe which this notebook will upload to hopsworks
yw_columns = ["date", "ski_resort_id", "closed", "mean_week_temperature"]

In [None]:
id_expectation = ge.core.ExpectationConfiguration(
    expectation_type="expect_column_min_to_be_between",
    kwargs={
        "column":"ski_resort_id",
        "min_value":0,
        "max_value": 1000000
    }
)

temperature_expectation = year_expectation = ge.core.ExpectationConfiguration(
    expectation_type="expect_column_min_to_be_between",
    kwargs={
        "column":"mean_week_temperature",
        "min_value":-100,
        "max_value":20
    }
)

temperature_expectation_2 = year_expectation = ge.core.ExpectationConfiguration(
    expectation_type="expect_column_max_to_be_between",
    kwargs={
        "column":"mean_week_temperature",
        "min_value":0,
        "max_value":100
    }
)

In [None]:
ski_weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="ski_weather_expectation_suite"
)
ski_weather_expectation_suite.add_expectation(id_expectation)
ski_weather_expectation_suite.add_expectation(temperature_expectation)
ski_weather_expectation_suite.add_expectation(temperature_expectation_2)

In [43]:
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
meteo_url = "https://archive-api.open-meteo.com/v1/archive"

## synchronize former ski resorts data

In [33]:
closed_resorts_fg = fs.get_feature_group(name='former_resorts', version=1)
cr_df = closed_resorts_fg.read(dataframe_type="pandas")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.33s) 


In [34]:
cr_df

Unnamed: 0,id,name,year_closed,latitude,longitude
0,262,Lutsen North Shore Ski Area,1989,47.6676,-90.7590
1,159,Schroon Lake Ski Center,1975,43.8081,-73.7594
2,186,Manning Park Ski Area,1982,48.7770,-121.4070
3,131,Mount Whittier Ski Area,1985,43.8000,-71.2000
4,523,Bielatal Ski Area,1999,50.8800,14.2100
...,...,...,...,...,...
236,216,Mount Joseph Ski Area,1984,44.3721,-72.3459
237,171,Chalet Ski Area,1985,43.7086,-85.0530
238,43,Bovensmolen Ski Resort,2005,50.9083,5.8333
239,284,White Birch Ski Area,1976,43.9712,-71.7084


### Get start year and end year

For measuring temperature. Every closed down ski resort

In [35]:
cr_df.insert(2, "start_year", min_year)
cr_df.insert(4, "end_year", max_year)

In [36]:
# set start_year and end_year for every resort
for i, resort in cr_df.iterrows():
    resort["start_year"] = resort["year_closed"] - 15
    resort["end_year"] = resort["year_closed"] + 15

    # enforce year interval to be in between dates which have data on open-meteo
    if resort["start_year"] < min_year:
        resort["start_year"] = min_year
    if resort["end_year"] > max_year:
        resort["end_year"] = max_year

    cr_df.loc[cr_df['id']==resort["id"], 'start_year'] = resort["start_year"]
    cr_df.loc[cr_df['id']==resort["id"], 'end_year'] = resort["end_year"]

print(cr_df)

      id                                  name  start_year  year_closed  \
0    262           Lutsen North Shore Ski Area        1974         1989   
1    159               Schroon Lake Ski Center        1960         1975   
2    186                 Manning Park Ski Area        1967         1982   
3    131               Mount Whittier Ski Area        1970         1985   
4    523                     Bielatal Ski Area        1984         1999   
..   ...                                   ...         ...          ...   
236  216                 Mount Joseph Ski Area        1969         1984   
237  171                       Chalet Ski Area        1970         1985   
238   43                Bovensmolen Ski Resort        1990         2005   
239  284                  White Birch Ski Area        1961         1976   
240  248  Whitefish Mountain Resort (Old Area)        1965         1980   

     end_year  latitude  longitude  
0        2004   47.6676   -90.7590  
1        1990   43.8081  

## Synchronize current ski resort data

In [212]:
current_resorts_fg = fs.get_feature_group(name='current_resorts', version=1)
or_df = current_resorts_fg.read(dataframe_type="pandas")

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.75s) 


In [213]:
or_df

Unnamed: 0,id,name,latitude,longitude
0,1226505097,Torgnon,45.814452,7.554285
1,601135063,Font d'Urle Chaud Clapier,44.910152,5.323491
2,1254287966,Ristolas en Queyras,44.771783,6.960893
3,601115623,Alpe Devero,46.307671,8.252052
4,7752047,San Martino di Castrozza - Passo Rolle,46.268927,11.792439
...,...,...,...,...
834,45409595,Antagnod,45.822300,7.682800
835,1227121146,Gitschenen – Isenthal,46.899355,8.501497
836,601131935,Saint Luc - Chandolin,46.236511,7.625363
837,642545662,Seefeld - Gschwandtkopf,47.317186,11.171586


## Prepare request to open meteo

For closed resorts, we call open-meteo once for every year from 1940 to the year before the current year. For each year call, we specify what ski resorts (coordinates) that need temperature measurements for the winter season.

For open resorts, we call for every year the last 15 years.

In [222]:
# for each year, get all resorts which need temperature for each date
def get_years_lat_long(start_year: int, end_year: int, resort_df: pd.DataFrame, closed: bool):
    years = range(start_year, end_year)
    years_lat_long = pd.DataFrame()
    for year in years:
        pd.date_range(start=f"{year}-01-01", end=f"{year}-12-31")
        ids = []
        latitudes = []
        longitudes = []
        for i, resort in resort_df.iterrows():
            # open resorts
            if(not closed):
                ids.append(resort["id"])
                latitudes.append(resort["latitude"])
                longitudes.append(resort["longitude"])
            # closed resorts
            elif year >= resort["start_year"] and year <= resort["end_year"]:
                ids.append(resort["id"])
                latitudes.append(resort["latitude"])
                longitudes.append(resort["longitude"])
    
        years_lat_long_elem = { 
             "year": year,
             "ski_resort_ids": ids,
             "latitudes": latitudes, 
             "longitudes": longitudes
        }
        
        years_lat_long_elem = pd.DataFrame(years_lat_long_elem)
        years_lat_long = pd.concat([years_lat_long, years_lat_long_elem])
    
    if debug:
        years_lat_long_json = json.dumps(years_lat_long)
        print(years_lat_long_json)
    return years_lat_long

### Get latitudes and longitudes for dates for open and closed resorts

In [223]:
closed_years_lat_long = get_years_lat_long(min_year, max_year+1, cr_df, True)
closed_years_lat_long

Unnamed: 0,year,ski_resort_ids,latitudes,longitudes
0,1941,151.0,46.6378,-121.3910
0,1942,151.0,46.6378,-121.3910
0,1943,151.0,46.6378,-121.3910
0,1944,151.0,46.6378,-121.3910
0,1945,151.0,46.6378,-121.3910
...,...,...,...,...
10,2024,129.0,44.2022,-72.9411
11,2024,125.0,42.8128,-76.0214
12,2024,841.0,44.5000,6.1500
13,2024,306.0,43.0059,-72.2193


In [224]:
open_years_lat_long = get_years_lat_long(max_year-15, max_year+1, or_df, False)
open_years_lat_long

Unnamed: 0,year,ski_resort_ids,latitudes,longitudes
0,2009,1226505097,45.814452,7.554285
1,2009,601135063,44.910152,5.323491
2,2009,1254287966,44.771783,6.960893
3,2009,601115623,46.307671,8.252052
4,2009,7752047,46.268927,11.792439
...,...,...,...,...
834,2024,45409595,45.822300,7.682800
835,2024,1227121146,46.899355,8.501497
836,2024,601131935,46.236511,7.625363
837,2024,642545662,47.317186,11.171586


### Function definition for converson from daily temperatures to weekly

* Each month gets divided into 4-5 weeks.
* The first week of a month always starts on the first date of the month (contrary to how weeks work in the usual sense). 
* The fifth week of the month either consists of 0, 2 or 3 days, depending on the number of days of the month.
* Leap years are not taken into account (i.e all february 29:th days are skipped)

In [196]:

# Convert the year_daily_ski_weather_dataframe to weekly weather
def daily_to_weekly(yd_df: pandas.DataFrame, current_year: int, yw_columns: list):
    months = [1, 2, 3, 11, 12]
    weeks = [1,2,3,4,5]
    yw_df = pd.DataFrame(columns=yw_columns)
    
    for sr_id in yd_df["resort_id"].unique():
        for month in months:
            for week in weeks:
                # capture all days, no matter which month
                if  week < weeks[len(weeks)-1]:
                    days_in_week = range(7*(week-1)+1, 7*(week)+1)
                else:
                    if month == 2:
                        days_in_week = []
                    elif month == 11:
                        days_in_week = [29,30]
                    else:
                        days_in_week = [29,30,31]
                # calculate the mean week temperature and insert it into dataframe   
                week_temps = []
                for day in days_in_week: 
                    today = date(current_year, month, day)
                    temperature_today = yd_df.loc[(yd_df['resort_id'] == sr_id) & (yd_df['date'].dt.date == today)]["temperature_2m_mean"]
                    if not temperature_today.size == 1:
                        if debug:
                            print(f"The week is: {week}. current date: {current_year}-{month}-{day}")
                            print("Temperature today:", temperature_today)
                        raise Exception("Multiple or no temperature values found for resort_id and date!")
                    week_temps.append(temperature_today.values[0])
                # edge case for february, who doesn'ẗ have a fifth week
                if len(week_temps) == 0:
                    continue
                mean_week_temp = statistics.mean(week_temps)
                week_date = f"{current_year}-{month}-{week}"
                
                elem_data = np.array([[week_date, sr_id, True, mean_week_temp]])
                elem_row = pd.DataFrame(data=elem_data, columns=yw_columns)
                yw_df = pd.concat([yw_df, elem_row])
    return yw_df
        


### Function definition for calling open meteo API and reformat to weekly dates

In [None]:
ski_weather_df = pd.DataFrame()
num_requests = 0
debug = False

for year_lat_long_index in range(0, 39):
    year_lat_long = years_lat_long[year_lat_long_index]
    current_year = year_lat_long["year"]
    # parameters to send to open-meteo
    params = {
        "latitude": year_lat_long["latitudes"],
        "longitude": year_lat_long["longitudes"],
        "start_date": f"{current_year}-01-01",
        "end_date": f"{current_year}-12-31",
        "daily": "temperature_2m_mean",
    }
    # send api request
    successful_request = False
    num_requests += 1
    while(not successful_request):
        try:
            responses = openmeteo.weather_api(meteo_url, params=params)
            successful_request = True
        except:
            print(f"rate limit exceeded at {num_requests} requests. Sleeping for 60 seconds...")
            time.sleep(60)
            print("Sleep done")
    if debug:
        print("current year:", current_year, ", num responses: ", len(responses))
    y_d_ski_weather_df = pd.DataFrame()
    # Process
    for resort_index in range(0, len(responses)):
        response = responses[resort_index]
        daily = response.Daily()
        daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy()
    
        daily_data = {"date": pd.date_range(
        	start = pd.to_datetime(daily.Time(), unit = "s", utc = True),
        	end =  pd.to_datetime(daily.TimeEnd(), unit = "s", utc = True),
        	freq = pd.Timedelta(seconds = daily.Interval()),
        	inclusive = "left"
        )}
        daily_data["temperature_2m_mean"] = daily_temperature_2m_mean
        daily_data["closed"] = True
        daily_data["resort_id"] = year_lat_long["ski_resort_ids"][resort_index]
        daily_dataframe = pd.DataFrame(data = daily_data)
    
        y_d_ski_weather_df = daily_dataframe

        #start = time.time()
        y_w_ski_weather_df = daily_to_weekly(y_d_ski_weather_df, current_year)
        #end = time.time()
        #print((end - start) * 1000)
        ski_weather_df = pd.concat([ski_weather_df, y_w_ski_weather_df])
    print(f"year {current_year} done")

print("\nski weather data\n", ski_weather_df)

year 1940 done
year 1941 done
year 1942 done
year 1943 done
year 1944 done
year 1945 done
year 1946 done
year 1947 done
year 1948 done
year 1949 done
year 1950 done
year 1951 done
year 1952 done
year 1953 done
year 1954 done
year 1955 done
year 1956 done
year 1957 done
year 1958 done
year 1959 done
year 1960 done
year 1961 done
year 1962 done
year 1963 done
year 1964 done
year 1965 done
year 1966 done
year 1967 done
year 1968 done
year 1969 done
year 1970 done
year 1971 done
year 1972 done
year 1973 done
year 1974 done


### Check/fetch if any years for ski resorts are missing

In [None]:
# create/get feature store
closed_resorts_fg = fs.get_or_create_feature_group(
    name='ski_weather',
    description='weekly ski resort weather for both closed and open resorts',
    version=1,
    primary_key=['ski_resort_id', 'closed'],
    expectation_suite=ski_weather_expectation_suite
)

hw_ski_weather_df = closed_resorts_fg.read(dataframe_type="pandas")

In [None]:
fetched_year = -1
fetched_resorts = "None"
for year in range(min_year, max_year+1):
    # check closed resorts
    year_elems = hw_ski_weather_df.loc[hw_ski_weather_df['date'].dt.date.year == year & hw_ski_weather_df['closed'] == True]
    if year_elems.empty():
        fetched_year = year
        fetched_resorts = "closed"
        year_lat_long = closed_years_lat_long.loc[cr_df['year']==year]
        ski_weather_df = get_winter_year_temperatures(year_lat_long)
        break

    break
    # check open resorts
    year_elems = hw_ski_weather_df.loc[hw_ski_weather_df['date'].dt.date.year == year & hw_ski_weather_df['closed'] == False]
    if year_elems.empty():
        fetched_year = year
        fetched_resorts = "open"
        year_lat_long = open_years_lat_long.loc[cr_df['year']==year]
        ski_weather_df = get_winter_year_temperatures(year_lat_long)
        break

print(f"done with fetching weekly temperature data on year {fetched_year} for {fetched_resorts} resorts")

## Upload fetched year data to hopsworks

In [None]:
if not fetched_year == -1:
    # Insert Dataframe into ski weather feature group
    ski_weather_fg.insert(ski_weather_df)