In [53]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: /Users/erik/Code/Skola/ID2223/mlfs-book
HopsworksSettings initialized!


In [54]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

In [55]:
project = hopsworks.login()

2025-11-18 11:59:52,971 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-18 11:59:52,976 INFO: Initializing external client
2025-11-18 11:59:52,977 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-18 11:59:54,312 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279161


In [56]:
import requests
import csv
import os

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value()


secrets = hopsworks.get_secrets_api()
# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")
if secret is not None:
    secret.delete()
    print("Replacing existing AQICN_API_KEY")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

urls = []
sensor_street = pd.DataFrame()
# open root/sensors.json and read urls
with open(f"{root_dir}/sensors.json") as f:
    sensors = json.load(f)
    for sensor in sensors["sensors"]:
        urls.append(sensor['url'])
        temp_df = pd.DataFrame({
            "url": [sensor['url']],
            "street": [sensor['street']]
        })
        sensor_street = pd.concat([sensor_street, temp_df], ignore_index=True)
# make a table with url street
print(sensor_street)
stations = []

for url in urls:
    query = f"{url}/?token={AQICN_API_KEY}"
    print("Querying URL:", query)
    data = util.trigger_request(query)
    print(data)
    # get the pm25 value
    loc = data['data']['city']['name']
    street = loc.split(",")[0].strip()
    city = "Regensburg"
    country = "Germany"
    stations.append({
        "url": url,
        "country": country,
        "city": city,
        "street": street,
        "url":url
    })
    latitude = data['data']['city']['geo'][0]
    longitude = data['data']['city']['geo'][1]





Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
                                  url               street
0  https://api.waqi.info/feed/A191545           Steyrerweg
1  https://api.waqi.info/feed/A474136          Pfeilstra√üe
2  https://api.waqi.info/feed/A469291          Hafnersteig
3   https://api.waqi.info/feed/A66394  Fichtelgebirgstra√üe
Querying URL: https://api.waqi.info/feed/A191545/?token=f8c9b0aad7e624da4286392f37a47c05f9322983
{'status': 'ok', 'data': {'aqi': 11, 'idx': -191545, 'attributions': [{'url': 'https://sensor.community/', 'name': 'Citizen Science project sensor.community', 'station': '56075'}, {'url': 'https://waqi.info/', 'name': 'World Air Quality Index Project'}], 'city': {'geo': [49.022, 12.066], 'name': 'Steyrerweg', 'url': 'https://aqicn.org/station/@191545', 'location': 'Steyrerweg, Westheim, K√∂nigswiesen, Regensburg, Bavaria, 93049, Germany'}, 'dominentpol': 'pm25', 'iaqi': {'pm10':

In [57]:
# open all csv in aqi_csv folder, check the filenames
big_df = pd.DataFrame()
for file in Path(f"./aqi_csv").glob("*.csv"):
    print(f"Processing file: {file.name}")
    street_name = file.name.replace(".csv", "")
    df = pd.read_csv(file)
    df['street'] = street_name
    df["city"] = "Regensburg"
    df["country"] = "Germany"
    df["url"] = sensor_street[sensor_street['street'] == street_name]['url'].values[0]
    print(f"Assigned ID: {df['url'].iloc[0]} for street: {street_name}")
    # if median in columns, rename to pm25
    if 'median' in df.columns:
        df = df.rename(columns={"median": "pm25"})
        
    big_df = pd.concat([big_df, df], ignore_index=True)
    

big_df.columns = big_df.columns.str.strip()
# drop where pm25 is NaN or empty
big_df = big_df.dropna(subset=['pm25'])
big_df = big_df[big_df['pm25'] != ' ']
big_df


Processing file: Steyrerweg.csv
Assigned ID: https://api.waqi.info/feed/A191545 for street: Steyrerweg
Processing file: Hafnersteig.csv
Assigned ID: https://api.waqi.info/feed/A469291 for street: Hafnersteig
Processing file: Pfeilstra√üe.csv
Assigned ID: https://api.waqi.info/feed/A474136 for street: Pfeilstra√üe
Processing file: Fichtelgebirgstra√üe.csv
Assigned ID: https://api.waqi.info/feed/A66394 for street: Fichtelgebirgstra√üe


Unnamed: 0,date,min,max,pm25,q1,q3,stdev,count,street,city,country,url
0,2021-01-18T00:00:00.000Z,3.05,8.82,6.47,5.57,7.17,1.333,56,Steyrerweg,Regensburg,Germany,https://api.waqi.info/feed/A191545
1,2021-01-19T00:00:00.000Z,2.70,16.60,6.05,4.72,7.18,1.975,215,Steyrerweg,Regensburg,Germany,https://api.waqi.info/feed/A191545
2,2021-01-20T00:00:00.000Z,5.50,18.95,11.03,9.66,14.05,2.805,260,Steyrerweg,Regensburg,Germany,https://api.waqi.info/feed/A191545
3,2021-01-21T00:00:00.000Z,9.27,18.35,12.38,11.04,14.73,2.114,241,Steyrerweg,Regensburg,Germany,https://api.waqi.info/feed/A191545
4,2021-01-22T00:00:00.000Z,4.34,42.63,8.01,7.09,10.69,3.302,256,Steyrerweg,Regensburg,Germany,https://api.waqi.info/feed/A191545
...,...,...,...,...,...,...,...,...,...,...,...,...
5075,2025-11-14T00:00:00.000Z,1.14,8.75,2.47,2.00,2.92,0.787,592,Fichtelgebirgstra√üe,Regensburg,Germany,https://api.waqi.info/feed/A66394
5076,2025-11-15T00:00:00.000Z,2.60,8.00,4.51,4.09,5.03,0.780,594,Fichtelgebirgstra√üe,Regensburg,Germany,https://api.waqi.info/feed/A66394
5077,2025-11-16T00:00:00.000Z,3.83,14.38,5.60,5.05,6.22,0.996,594,Fichtelgebirgstra√üe,Regensburg,Germany,https://api.waqi.info/feed/A66394
5078,2025-11-17T00:00:00.000Z,0.00,6.40,0.42,0.20,4.32,2.170,592,Fichtelgebirgstra√üe,Regensburg,Germany,https://api.waqi.info/feed/A66394


In [58]:
import datetime

today = datetime.date.today()

for station in stations:
    aqicn_url = station["url"]
    street = station["street"]
    city = station["city"]
    country = station["country"]
    print(f"Fetching data for station: {country}/{city}/{street}")
    try:
        # replace spaces with hyphens in street
        street = street.replace(" ", "-").lower()
        aq_today_df = util.get_pm25(station["url"], country, city, street, today, AQICN_API_KEY, id=station["url"])
    except hopsworks.RestAPIError:
        print("It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")
    

    print(aq_today_df.head())

Fetching data for station: Germany/Regensburg/Steyrerweg
Fetching AQI data from URL with ID: https://api.waqi.info/feed/A191545/?token=f8c9b0aad7e624da4286392f37a47c05f9322983
   pm25  country        city      street       date  \
0  11.0  Germany  Regensburg  steyrerweg 2025-11-18   

                                  url  
0  https://api.waqi.info/feed/A191545  
Fetching data for station: Germany/Regensburg/Pfeilstra√üe
Fetching AQI data from URL with ID: https://api.waqi.info/feed/A474136/?token=f8c9b0aad7e624da4286392f37a47c05f9322983
   pm25  country        city       street       date  \
0   7.0  Germany  Regensburg  pfeilstra√üe 2025-11-18   

                                  url  
0  https://api.waqi.info/feed/A474136  
Fetching data for station: Germany/Regensburg/Hafnersteig
Fetching AQI data from URL with ID: https://api.waqi.info/feed/A469291/?token=f8c9b0aad7e624da4286392f37a47c05f9322983
   pm25  country        city       street       date  \
0   3.0  Germany  Regensburg

## <span style='color:#ff5f27'> üåç STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame

## <span style='color:#ff5f27'> üåç STEP 6: Data cleaning</span>


### Rename columns if needed and drop unneccessary columns

We want to have a DataFrame with 2 columns - `date` and `pm25` after this cell below:

## Check the data types for the columns in your DataFrame

 * `date` should be of type   datetime64[ns] 
 * `pm25` should be of type float64

In [59]:
big_df.columns = big_df.columns.str.strip()
big_df.columns


Index(['date', 'min', 'max', 'pm25', 'q1', 'q3', 'stdev', 'count', 'street',
       'city', 'country', 'url'],
      dtype='object')

In [60]:
df_aq = big_df[['date', 'pm25', 'street', 'city', 'url']].copy()
# convert date to datetime
df_aq['date'] = pd.to_datetime(df_aq['date'])
df_aq['pm25'] = df_aq['pm25'].astype('float32')
# rename all city to turku and all country to finland
df_aq['city'] = 'Regensburg'
df_aq['country'] = 'Germany'

df_aq

Unnamed: 0,date,pm25,street,city,url,country
0,2021-01-18 00:00:00+00:00,6.47,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
1,2021-01-19 00:00:00+00:00,6.05,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
2,2021-01-20 00:00:00+00:00,11.03,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
3,2021-01-21 00:00:00+00:00,12.38,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
4,2021-01-22 00:00:00+00:00,8.01,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
...,...,...,...,...,...,...
5075,2025-11-14 00:00:00+00:00,2.47,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5076,2025-11-15 00:00:00+00:00,4.51,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5077,2025-11-16 00:00:00+00:00,5.60,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5078,2025-11-17 00:00:00+00:00,0.42,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany


In [61]:
# Cast the pm25 column to be a float32 data type
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5080 entries, 0 to 5079
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype              
---  ------   --------------  -----              
 0   date     5080 non-null   datetime64[ns, UTC]
 1   pm25     5080 non-null   float32            
 2   street   5080 non-null   object             
 3   city     5080 non-null   object             
 4   url      5080 non-null   object             
 5   country  5080 non-null   object             
dtypes: datetime64[ns, UTC](1), float32(1), object(4)
memory usage: 218.4+ KB


## <span style='color:#ff5f27'> üåç STEP 7: Drop any rows with missing data </span>
It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.

In [62]:
df_aq.dropna(inplace=True)
df_aq

Unnamed: 0,date,pm25,street,city,url,country
0,2021-01-18 00:00:00+00:00,6.47,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
1,2021-01-19 00:00:00+00:00,6.05,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
2,2021-01-20 00:00:00+00:00,11.03,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
3,2021-01-21 00:00:00+00:00,12.38,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
4,2021-01-22 00:00:00+00:00,8.01,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
...,...,...,...,...,...,...
5075,2025-11-14 00:00:00+00:00,2.47,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5076,2025-11-15 00:00:00+00:00,4.51,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5077,2025-11-16 00:00:00+00:00,5.60,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
5078,2025-11-17 00:00:00+00:00,0.42,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany


---

In [63]:
df_aq = df_aq.sort_values(by='date')

In [64]:
df_aq

Unnamed: 0,date,pm25,street,city,url,country
2958,2019-12-09 00:00:00+00:00,2.310000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
2959,2019-12-10 00:00:00+00:00,4.700000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
2960,2019-12-11 00:00:00+00:00,10.900000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
2961,2019-12-12 00:00:00+00:00,19.959999,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
2962,2019-12-13 00:00:00+00:00,14.520000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
...,...,...,...,...,...,...
5078,2025-11-17 00:00:00+00:00,0.420000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany
1748,2025-11-18 00:00:00+00:00,1.350000,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany
2363,2025-11-18 00:00:00+00:00,0.250000,Hafnersteig,Regensburg,https://api.waqi.info/feed/A469291,Germany
2957,2025-11-18 00:00:00+00:00,1.800000,Pfeilstra√üe,Regensburg,https://api.waqi.info/feed/A474136,Germany


In [65]:


for street in df_aq['street'].unique():
    print("Processing street:", street)
    street_df = df_aq[df_aq['street'] == street]
    street_df = street_df.sort_values(by='date')
    df_aq.loc[street_df.index, "lag_1_pm25"] = street_df["pm25"].shift(1)
    df_aq.loc[street_df.index, "lag_2_pm25"] = street_df["pm25"].shift(2)
    df_aq.loc[street_df.index, "lag_3_pm25"] = street_df["pm25"].shift(3)

# drop the last 3 rows with NaN values due to lag features
df_aq = df_aq.dropna()

Processing street: Fichtelgebirgstra√üe
Processing street: Steyrerweg
Processing street: Hafnersteig
Processing street: Pfeilstra√üe


In [66]:
df_aq

Unnamed: 0,date,pm25,street,city,url,country,lag_1_pm25,lag_2_pm25,lag_3_pm25
2961,2019-12-12 00:00:00+00:00,19.959999,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,10.900000,4.700000,2.310000
2962,2019-12-13 00:00:00+00:00,14.520000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,19.959999,10.900000,4.700000
2963,2019-12-14 00:00:00+00:00,2.000000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,14.520000,19.959999,10.900000
2964,2019-12-15 00:00:00+00:00,3.500000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,2.000000,14.520000,19.959999
2965,2019-12-16 00:00:00+00:00,8.070000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,3.500000,2.000000,14.520000
...,...,...,...,...,...,...,...,...,...
5078,2025-11-17 00:00:00+00:00,0.420000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,5.600000,4.510000,2.470000
1748,2025-11-18 00:00:00+00:00,1.350000,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany,1.800000,16.049999,12.950000
2363,2025-11-18 00:00:00+00:00,0.250000,Hafnersteig,Regensburg,https://api.waqi.info/feed/A469291,Germany,1.830000,2.900000,3.200000
2957,2025-11-18 00:00:00+00:00,1.800000,Pfeilstra√üe,Regensburg,https://api.waqi.info/feed/A474136,Germany,1.800000,13.950000,10.850000


## <span style='color:#ff5f27'> üå¶ Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## <span style='color:#ff5f27'> üåç STEP 9: Download the Historical Weather Data </span>

https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

We will download the historical weather data for your `city` by first extracting the earliest date from your DataFrame containing the historical air quality measurements.

We will download all daily historical weather data measurements for your `city` from the earliest date in your air quality measurement DataFrame. It doesn't matter if there are missing days of air quality measurements. We can store all of the daily weather measurements, and when we build our training dataset, we will join up the air quality measurements for a given day to its weather features for that day. 

The weather features we will download are:

 * `temperature (average over the day)`
 * `precipitation (the total over the day)`
 * `wind speed (average over the day)`
 * `wind direction (the most dominant direction over the day)`


In [67]:
earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 49.033390045166016¬∞N 12.112436294555664¬∞E
Elevation 353.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [68]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2169 entries, 0 to 2168
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         2169 non-null   datetime64[ns]
 1   temperature_2m_mean          2169 non-null   float32       
 2   precipitation_sum            2169 non-null   float32       
 3   wind_speed_10m_max           2169 non-null   float32       
 4   wind_direction_10m_dominant  2169 non-null   float32       
 5   city                         2169 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 67.9+ KB


In [69]:
weather_df

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city
0,2019-12-12,1.273250,2.400000,11.681987,199.359039,Regensburg
1,2019-12-13,0.998250,4.800001,27.897097,186.033127,Regensburg
2,2019-12-14,4.981583,5.000000,35.369884,241.848740,Regensburg
3,2019-12-15,7.102417,6.000000,20.592503,207.694717,Regensburg
4,2019-12-16,4.950334,0.300000,13.392774,130.563416,Regensburg
...,...,...,...,...,...,...
2164,2025-11-14,5.467000,0.000000,13.752047,138.572876,Regensburg
2165,2025-11-15,6.267000,0.000000,12.870944,127.157738,Regensburg
2166,2025-11-16,5.908667,1.800000,11.966954,139.407562,Regensburg
2167,2025-11-17,5.052416,2.800000,23.298702,306.918243,Regensburg


## <span style='color:#ff5f27'> üåç STEP 10: Define Data Validation Rules </span>

We will validate the air quality measurements (`pm25` values) before we write them to Hopsworks.

We define a data validation rule (an expectation in Great Expectations) that ensures that `pm25` values are not negative or above the max value available by the sensor.

We will attach this expectation to the air quality feature group, so that we validate the `pm25` data every time we write a DataFrame to the feature group. We want to prevent garbage-in, garbage-out.

In [70]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [71]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---

### <span style="color:#ff5f27;"> üîÆ STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [72]:
fs = project.get_feature_store() 

#### Save country, city, street names as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline

In [73]:
dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

# Replace any existing secret with the new value
secret = secrets.get_secret("SENSOR_LOCATION_JSON_A")
if secret is not None:
    secret.delete()
    print("Replacing existing SENSOR_LOCATION_JSON")

secrets.create_secret("SENSOR_LOCATION_JSON_A", str_dict)

Replacing existing SENSOR_LOCATION_JSON
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('SENSOR_LOCATION_JSON_A', 'PRIVATE')

In [74]:
df_aq

Unnamed: 0,date,pm25,street,city,url,country,lag_1_pm25,lag_2_pm25,lag_3_pm25
2961,2019-12-12 00:00:00+00:00,19.959999,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,10.900000,4.700000,2.310000
2962,2019-12-13 00:00:00+00:00,14.520000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,19.959999,10.900000,4.700000
2963,2019-12-14 00:00:00+00:00,2.000000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,14.520000,19.959999,10.900000
2964,2019-12-15 00:00:00+00:00,3.500000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,2.000000,14.520000,19.959999
2965,2019-12-16 00:00:00+00:00,8.070000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,3.500000,2.000000,14.520000
...,...,...,...,...,...,...,...,...,...
5078,2025-11-17 00:00:00+00:00,0.420000,Fichtelgebirgstra√üe,Regensburg,https://api.waqi.info/feed/A66394,Germany,5.600000,4.510000,2.470000
1748,2025-11-18 00:00:00+00:00,1.350000,Steyrerweg,Regensburg,https://api.waqi.info/feed/A191545,Germany,1.800000,16.049999,12.950000
2363,2025-11-18 00:00:00+00:00,0.250000,Hafnersteig,Regensburg,https://api.waqi.info/feed/A469291,Germany,1.830000,2.900000,3.200000
2957,2025-11-18 00:00:00+00:00,1.800000,Pfeilstra√üe,Regensburg,https://api.waqi.info/feed/A474136,Germany,1.800000,13.950000,10.850000


### <span style="color:#ff5f27;"> üîÆ STEP 12: Create the Feature Groups and insert the DataFrames in them </span>

### <span style='color:#ff5f27'> üå´ Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [75]:
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality_a',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['country','city', 'street', 'url'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [76]:
air_quality_fg.insert(df_aq, wait=True)

2025-11-18 12:00:08,143 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279161/fs/1273929/fg/1737050


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 5068/5068 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_a_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279161/jobs/named/air_quality_a_1_offline_fg_materialization/executions
2025-11-18 12:00:24,911 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-18 12:00:28,165 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-18 12:01:54,295 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-18 12:01:54,461 INFO: Waiting for log aggregation to finish.
2025-11-18 12:02:06,384 INFO: Execution finished successfully.


(Job('air_quality_a_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 768010
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 5068,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T11:00:08.000143Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expe

#### Enter a description for each feature in the Feature Group

In [77]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("url", "Unique identifier for the air quality measurement station")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("lag_1_pm25", "Lag feature: pm25 value for previous day")
air_quality_fg.update_feature_description("lag_2_pm25", "Lag feature: pm25 value for two days ago")
air_quality_fg.update_feature_description("lag_3_pm25", "Lag feature: pm25 value for three days ago")

<hsfs.feature_group.FeatureGroup at 0x1760c74d0>

### <span style='color:#ff5f27'> üå¶ Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [78]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather_a',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

#### Insert the DataFrame into the Feature Group

In [79]:
# Insert data
weather_fg.insert(weather_df, wait=True)

2025-11-18 12:02:13,524 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279161/fs/1273929/fg/1721945


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 2169/2169 | Elapsed Time: 00:02 | Remaining Time: 00:00


(Job('weather_a_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "precipitation_sum",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 762130
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 2169,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T11:02:13.000522Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_co

#### Enter a description for each feature in the Feature Group

In [80]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x1082f2310>

## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 02: Daily Feature Pipeline 
 </span> 


## <span style="color:#ff5f27;">‚è≠Ô∏è **Exercises:** 
 </span> 

Extra Homework:

  * Try adding a new feature based on a rolling window of 3 days for 'pm25'
      * This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
      * df.set_index("date").rolling(3).mean() is only the start....
  * Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters. 
      * Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
      * After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System
  * As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---