<a href="https://colab.research.google.com/github/xrisaD/ScalableMLProject/blob/main/1_backfill_feature_groups.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Backfill Features to the Feature Store</span>


## 🗒️ This notebook is divided into the following sections:
1. Fetch historical data
2. Connect to the Hopsworks feature store
3. Create feature groups and insert them to the feature store

## <span style='color:#ff5f27'> 📝 Imports

In [1]:
import sys
sys.path.append('../')

In [2]:
import hopsworks
from urllib.request import urlopen
import pandas as pd
import json

from features.feature_engineering import timestamp_2_time

---

## <span style='color:#ff5f27'> Locations </span>


In [3]:
place_list = ['Abisko', 'Uppsala', 'Spånga']

place_streamflow = [2357, 2609, 2212]

lat_long = [[ 68.35, 18.82],[59.87, 17.60], [58.00,12.73]]

## <span style='color:#ff5f27'> 💽 Loading Historical Data</span>


#### <span style='color:#ff5f27'> 👩🏻‍🔬 Streamflow Data

In [4]:
streamflow_api = 'https://opendata-download-hydroobs.smhi.se/api/version/latest/parameter/1/station/{}/period/corrected-archive/data.csv'
streamflow_files = [streamflow_api.format(city) for city in place_streamflow]
streamflow_files

['https://opendata-download-hydroobs.smhi.se/api/version/latest/parameter/1/station/2357/period/corrected-archive/data.csv',
 'https://opendata-download-hydroobs.smhi.se/api/version/latest/parameter/1/station/2609/period/corrected-archive/data.csv',
 'https://opendata-download-hydroobs.smhi.se/api/version/latest/parameter/1/station/2212/period/corrected-archive/data.csv']

In [5]:
df_streamflow = pd.DataFrame()

for file, place in zip(streamflow_files, place_list):
    df_new = pd.read_csv(file, delimiter=';', skiprows=7, names=["date", "streamflow"]+[str(x) for x in range(4)], usecols=range(2))
    df_new['place'] = place
    df_streamflow = pd.concat([df_streamflow, df_new])
df_streamflow.head()

Unnamed: 0,date,streamflow,place
0,1984-08-17,106.7242,Abisko
1,1984-08-18,103.7052,Abisko
2,1984-08-19,99.4413,Abisko
3,1984-08-20,93.1354,Abisko
4,1984-08-21,93.786,Abisko


In [6]:
print(df_streamflow.date.unique())
print(len(df_streamflow.date.unique()))

['1984-08-17' '1984-08-18' '1984-08-19' ... '1984-08-14' '1984-08-15'
 '1984-08-16']
16151


In [7]:
df_streamflow['date'] = pd.to_datetime(df_streamflow['date'])

We delete all the data before 2021-01-01.

In [8]:
df_streamflow_2 = df_streamflow[~(df_streamflow['date'] < '2021-01-01')]

In [9]:
df_streamflow_2['date'] = df_streamflow_2['date'].dt.date

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [10]:
df_streamflow_2.head()

Unnamed: 0,date,streamflow,place
13286,2021-01-01,28.7626,Abisko
13287,2021-01-02,28.0917,Abisko
13288,2021-01-03,27.6631,Abisko
13289,2021-01-04,27.3627,Abisko
13290,2021-01-05,27.288,Abisko


In [11]:
df_streamflow_2.date = df_streamflow_2.date.apply(timestamp_2_time)
df_streamflow_2.sort_values(by = ['place','date'], inplace = True, ignore_index = True)

df_streamflow_2.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Unnamed: 0,date,streamflow,place
0,1609455600000,28.7626,Abisko
1,1609542000000,28.0917,Abisko
2,1609628400000,27.6631,Abisko
3,1609714800000,27.3627,Abisko
4,1609801200000,27.288,Abisko


#### <span style='color:#ff5f27'> 🌦 Weather Data

In order to load the Weather Data, we needed to do many API calls and join the columns based on the common columns. 

In [27]:
weather_api ="https://archive-api.open-meteo.com/v1/era5?timezone=Europe/Berlin&latitude={}&longitude={}&start_date=2021-01-01&end_date=2022-12-17&daily=temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,windspeed_10m_max,windgusts_10m_max,winddirection_10m_dominant,et0_fao_evapotranspiration"

In [28]:
weather_files = [weather_api.format(ll[0], ll[1]) for ll in lat_long]

In [29]:
df_weather = pd.DataFrame()

for url, place in zip(weather_files, place_list):
  response = urlopen(url)
  data_json = json.loads(response.read())
  df_new = pd.DataFrame.from_dict(data_json['daily'], orient="columns")
  df_new['place'] = place
  df_weather = pd.concat([df_weather, df_new])

In [30]:
df_weather = df_weather.rename(columns={"time": "date"})

In [31]:
df_weather.date = df_weather.date.apply(timestamp_2_time)
df_weather.sort_values(by=['place', 'date'],inplace=True, ignore_index=True)

df_weather.head(10)

Unnamed: 0,date,temperature_2m_max,temperature_2m_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,windspeed_10m_max,windgusts_10m_max,winddirection_10m_dominant,et0_fao_evapotranspiration,place
0,1609455600000,-3.3,-10.9,0.0,0.0,0.0,0.0,11.3,38.9,123.0,0.0,Abisko
1,1609542000000,-0.8,-10.5,0.3,0.0,0.77,3.0,5.4,22.3,207.0,0.0,Abisko
2,1609628400000,-1.1,-4.1,0.0,0.0,0.14,0.0,5.2,22.3,228.0,0.0,Abisko
3,1609714800000,0.5,-3.7,1.1,0.0,0.91,8.0,6.9,29.9,241.0,0.0,Abisko
4,1609801200000,1.6,0.3,0.3,0.0,1.12,3.0,4.6,24.1,210.0,0.0,Abisko
5,1609887600000,2.1,-3.5,0.3,0.0,0.56,3.0,8.7,27.4,161.0,0.0,Abisko
6,1609974000000,-3.1,-11.8,0.0,0.0,0.0,0.0,9.0,27.0,134.0,0.0,Abisko
7,1610060400000,-11.7,-14.8,0.0,0.0,0.0,0.0,8.7,23.0,140.0,0.0,Abisko
8,1610146800000,-11.4,-14.5,0.0,0.0,0.0,0.0,13.1,43.2,130.0,0.01,Abisko
9,1610233200000,-5.9,-11.3,0.0,0.0,0.21,0.0,13.1,44.6,130.0,0.03,Abisko


---

## <span style="color:#ff5f27;"> 🔮 Connecting to Hopsworks Feature Store </span>

In [32]:
project = hopsworks.login()
fs = project.get_feature_store() 

Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated

Paste it here: ········
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/5318




Connected. Call `.close()` to terminate connection gracefully.


---

## <span style="color:#ff5f27;">🪄 Creating Feature Groups</span>

#### <span style='color:#ff5f27'> 👩🏻‍🔬 FlowStream Data

In [33]:
flowstream_fg = fs.get_or_create_feature_group(
        name = 'streamflow_fg',
        description = 'Streamflow characteristics of each day',
        version = 1,
        primary_key = ['place','date'],
        online_enabled = True,
        event_time = 'date'
    )    

flowstream_fg.insert(df_streamflow_2)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/5318/fs/5238/fg/9557


Uploading Dataframe: 0.00% |          | Rows 0/2080 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/5318/jobs/named/streamflow_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x2e848026d00>, None)

#### <span style='color:#ff5f27'> 🌦 Weather Data

In [34]:
weather_fg = fs.get_or_create_feature_group(
        name = 'weather_fg',
        description = 'Weather characteristics of each day',
        version = 1,
        primary_key = ['place','date'],
        online_enabled = True,
        event_time = 'date'
    )    

weather_fg.insert(df_weather)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/5318/fs/5238/fg/9558


Uploading Dataframe: 0.00% |          | Rows 0/2148 | Elapsed Time: 00:00 | Remaining Time: ?

Launching offline feature group backfill job...
Backfill Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/5318/jobs/named/weather_fg_1_offline_fg_backfill/executions


(<hsfs.core.job.Job at 0x2e84805ad90>, None)

---