# Part 01: Feature Backfill for **all sensors of the city**


In [2]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir)
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH`
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: /Users/appbites/Desktop/mlfs-book
HopsworksSettings initialized!


## Imports

In [3]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

In [4]:
project = hopsworks.login(engine="python")

2025-11-12 16:35:01,127 INFO: Initializing external client
2025-11-12 16:35:01,128 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-12 16:35:02,970 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1289364


In [5]:
# taken from ~/.env. You can also replace settings.AQICN_API_KEY with the api key value as a string "...."
if settings.AQICN_API_KEY is None:
    print("You need to set AQICN_API_KEY either in this cell or in ~/.env")
    sys.exit(1)

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value()
aqicn_url = settings.AQICN_URL
country = settings.AQICN_COUNTRY
city = settings.AQICN_CITY
street = settings.AQICN_STREET
latitude, longitude = util.get_city_coordinates(city)
# Uncomment this if API call to get longitude and latitude
# latitude = "48.2167394444444"
# longitude = "16.3809180555556"

secrets = hopsworks.get_secrets_api()
# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")
if secret is not None:
    secret.delete()
    print("Replacing existing AQICN_API_KEY")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('AQICN_API_KEY', 'PRIVATE')

### Validate that the AQICN_API_KEY you added earlier works

In [None]:
try:
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_df.head()

### All sensors of the city

In [11]:
# --- Step 1: Search for stations in the city ---

# Approx Vienna bounding box
south, west, north, east = 48.08, 16.18, 48.33, 16.58

bounds_url = "https://api.waqi.info/map/bounds/"
params = {"token": AQICN_API_KEY, "latlng": f"{south},{west},{north},{east}"}
r = requests.get(bounds_url, params=params, timeout=20)
r.raise_for_status()
stations_geo = r.json().get("data", []) or []



keyword = f"{city}"
search_url = "https://api.waqi.info/search/"
search_params = {"token": AQICN_API_KEY, "keyword": keyword}

search_resp = requests.get(search_url, params=search_params, timeout=15)
search_resp.raise_for_status()
search_data = search_resp.json()

stations = search_data.get("data", [])
print(f"Found {len(stations)} stations matching '{keyword}'")

# --- Step 2: Convert to Pandas DataFrame ---
records = []
for s in stations:
    station = s.get("station", {})
    records.append({
        "uid": s.get("uid"),
        "name": station.get("name"),
        "country": station.get("country"),
        "lat": station.get("geo", [None, None])[0],
        "lon": station.get("geo", [None, None])[1],
        "aqi": s.get("aqi"),
        "url": f"https://aqicn.org/city/{station.get('url')}"
    })

df = pd.DataFrame(records)

# --- Step 3: Display ---
display(df.style.set_caption(f"WAQI stations found for {city}"))
# df.to_csv(f"../../data/list_stations_{city}.csv", index=False)
# print(f"Saved results to waqi_stations_{city}.csv")

Found 14 stations matching 'vienna'


Unnamed: 0,uid,name,country,lat,lon,aqi,url
0,4738,"Floridsdorf, Gerichtsgasse 1a (Prager Str. 65m), Austria",AT,48.261086,16.396954,63,https://aqicn.org/city/austria/floridsdorf--gerichtsgasse-1a-prager-str.-65m
1,4739,"4, Schafbergbad, Josef Redl Gasse 2, Gstr.Nr. 698, Austria",AT,48.23537,16.301563,57,https://aqicn.org/city/austria/schafbergbad--josef-redl-gasse-2--gstr.nr.-698/4
2,4736,"250, Wehlistra√üe 366, Gstr.Nr.2157, Austria",AT,48.20306,16.43455,55,https://aqicn.org/city/austria/wehlistrasse-366--gstr.nr.2157/250
3,14537,"Allgemeines Krankenhaus, Ostringweg (zwischen Geb√§uden BT25), Austria",AT,48.21911,16.349818,53,https://aqicn.org/city/austria/allgemeines-krankenhaus--ostringweg-zwischen-gebauden-bt25
4,2855,"1, Hausgrundweg 23, Gstr. 254, Austria",AT,48.226361,16.458345,53,https://aqicn.org/city/austria/hausgrundweg-23--gstr.-254/1
5,2857,"Umspannwerk Gaudenzdorfer G√ºrtel, Austria",AT,48.187147,16.339331,50,https://aqicn.org/city/austria/umspannwerk-gaudenzdorfer-gurtel
6,2850,"Kendlerstra√üe 40 (Umspannwerk), Austria",AT,48.205,16.30975,50,https://aqicn.org/city/austria/kendlerstrasse-40-umspannwerk
7,2870,"252, Belgradplatz (S√ºdostecke), Gstr.Nr. 816, Austria",AT,48.174353,16.361417,50,https://aqicn.org/city/austria/belgradplatz-sudostecke--gstr.nr.-816/252
8,2860,"Ecke Taborstra√üe - Glockengasse, Austria",AT,48.216739,16.380918,46,https://aqicn.org/city/austria/ecke-taborstrasse-glockengasse
9,2813,"Laaer Berg, Theodor Sickel-Gasse 1, Austria",AT,48.161036,16.39292,-,https://aqicn.org/city/austria/laaer-berg--theodor-sickel-gasse-1


### Filter the stations that report PM2.5

In [8]:
# --- Step 2: Check which stations report PM2.5 ---
stations_with_pm25 = []

for item in stations:
    uid = item["uid"]
    name = item["station"]["name"]
    geo = item["station"]["geo"]

    feed_url = f"https://api.waqi.info/feed/@{uid}/"
    feed_resp = requests.get(feed_url, params={"token": AQICN_API_KEY}, timeout=15)
    feed_resp.raise_for_status()
    feed_data = feed_resp.json().get("data", {})
    iaqi = feed_data.get("iaqi", {}) or {}

    if "pm25" in iaqi:
        stations_with_pm25.append({
            "uid": uid,
            "name": name,
            "latitude": geo[0],
            "longitude": geo[1],
            "AQI": feed_data.get("aqi"),
            "PM2.5": iaqi["pm25"].get("v") if isinstance(iaqi["pm25"], dict) else iaqi["pm25"],
            "URL": f"https://aqicn.org/city/@{uid}"
        })

df = pd.DataFrame(stations_with_pm25)
display(df.style.set_caption(f"PM2.5 Stations in {city}, {country}"))

Unnamed: 0,uid,name,latitude,longitude,AQI,PM2.5,URL
0,4738,"Floridsdorf, Gerichtsgasse 1a (Prager Str. 65m), Austria",48.261086,16.396954,63,63,https://aqicn.org/city/@4738
1,4739,"4, Schafbergbad, Josef Redl Gasse 2, Gstr.Nr. 698, Austria",48.23537,16.301563,57,57,https://aqicn.org/city/@4739
2,4736,"250, Wehlistra√üe 366, Gstr.Nr.2157, Austria",48.20306,16.43455,55,55,https://aqicn.org/city/@4736
3,14537,"Allgemeines Krankenhaus, Ostringweg (zwischen Geb√§uden BT25), Austria",48.21911,16.349818,53,53,https://aqicn.org/city/@14537
4,2855,"1, Hausgrundweg 23, Gstr. 254, Austria",48.226361,16.458345,53,53,https://aqicn.org/city/@2855
5,2857,"Umspannwerk Gaudenzdorfer G√ºrtel, Austria",48.187147,16.339331,50,50,https://aqicn.org/city/@2857
6,2850,"Kendlerstra√üe 40 (Umspannwerk), Austria",48.205,16.30975,50,50,https://aqicn.org/city/@2850
7,2870,"252, Belgradplatz (S√ºdostecke), Gstr.Nr. 816, Austria",48.174353,16.361417,50,50,https://aqicn.org/city/@2870
8,2860,"Ecke Taborstra√üe - Glockengasse, Austria",48.216739,16.380918,46,46,https://aqicn.org/city/@2860
9,2813,"Laaer Berg, Theodor Sickel-Gasse 1, Austria",48.161036,16.39292,89,89,https://aqicn.org/city/@2813


In [37]:
# Get the street names from the stations.json in order to insert that info in the Feature Group table
stations_path = Path(f"{root_dir}/stations.json")

with open(stations_path, "r") as f:
    stations_meta = json.load(f)

print(f"Loaded {len(stations_meta)} stations.")
for s in stations_meta:
    print(f"{s['uid']}: {s['name']} ‚Äî {s['street']}")

# Convert list -> dict for quick lookup by UID
station_meta_dict = {s["uid"]: s for s in stations_meta}


# Directory with all historical CSVs
data_dir = Path(f"{root_dir}/data")

print("Looking for CSV files in:", data_dir.resolve())

# Empty list to collect data from each station
all_stations_data = []

# Loop over every CSV in the folder
for csv_file in data_dir.glob("vienna_station_*.csv"):
    # Extract the station UID from the filename (e.g. vienna_station_4738_floridsdorf.csv)
    try:
        station_uid = int(csv_file.stem.split("_")[2])
    except (IndexError, ValueError):
        print(f" Could not parse UID from {csv_file.name}, skipping.")
        continue

    # STEP 5: Read CSV
    df = pd.read_csv(csv_file, parse_dates=['date'], skipinitialspace=True)

    # STEP 6: Keep only relevant columns
    if 'pm25' in df.columns:
        df_aq = df[['date', 'pm25']].copy()
    elif 'pm2.5' in df.columns:
        df_aq = df[['date', 'pm2.5']].rename(columns={'pm2.5': 'pm25'})
    else:
        print(f" No PM2.5 column found in {csv_file.name}, skipping.")
        continue

    # STEP 7: Drop missing data
    df_aq.dropna(subset=['pm25'], inplace=True)
    df_aq['pm25'] = df_aq['pm25'].astype('float32')

    # STEP 8: Add metadata columns
    meta = station_meta_dict.get(station_uid, {})
    df_aq['country'] = country
    df_aq['city'] = city
    df_aq["street"] = meta.get("street", "Unknown")
    df_aq['station_uid'] = station_uid
    df_aq['source_file'] = csv_file.name

    # Sdd the AQICN station URL
    df_aq['url'] = f"https://aqicn.org/station/@{station_uid}/"

    # Append to master list
    all_stations_data.append(df_aq)

# Combine all into a single DataFrame
df_all = pd.concat(all_stations_data, ignore_index=True)

print(f" Combined data from {len(all_stations_data)} stations ({len(df_all)} total rows).")
df_all.head()


Loaded 9 stations.
4738: Floridsdorf ‚Äî Floridsdorf, Gerichtsgasse 1a (Prager Str. 65m)
4739: Schafbergbad ‚Äî Schafbergbad, Josef Redl Gasse 2, Gstr.Nr. 698
4736: Wehlistra√üe ‚Äî Wehlistra√üe 366, Gstr.Nr.2157
14537: Allgemeines Krankenhaus (AKH) ‚Äî Ostringweg (zwischen Geb√§uden BT25)
2855: Hausgrundweg ‚Äî Hausgrundweg 23, Gstr. 254
2857: Gaudenzdorfer G√ºrtel ‚Äî Umspannwerk Gaudenzdorfer G√ºrtel
2850: Kendlerstra√üe ‚Äî Kendlerstra√üe 40 (Umspannwerk)
2870: Belgradplatz ‚Äî Belgradplatz (S√ºdostecke), Gstr.Nr. 816
2860: Taborstra√üe ‚Äî Ecke Taborstra√üe - Glockengasse
Looking for CSV files in: /Users/appbites/Desktop/mlfs-book/data
 Combined data from 9 stations (27673 total rows).


Unnamed: 0,date,pm25,country,city,street,station_uid,source_file,url
0,2025-11-02,47.0,austria,vienna,Kendlerstra√üe 40 (Umspannwerk),2850,vienna_station_2850_kendlerstrasse.csv,https://aqicn.org/station/@2850/
1,2025-11-03,9.0,austria,vienna,Kendlerstra√üe 40 (Umspannwerk),2850,vienna_station_2850_kendlerstrasse.csv,https://aqicn.org/station/@2850/
2,2025-11-04,33.0,austria,vienna,Kendlerstra√üe 40 (Umspannwerk),2850,vienna_station_2850_kendlerstrasse.csv,https://aqicn.org/station/@2850/
3,2025-11-05,40.0,austria,vienna,Kendlerstra√üe 40 (Umspannwerk),2850,vienna_station_2850_kendlerstrasse.csv,https://aqicn.org/station/@2850/
4,2025-11-06,50.0,austria,vienna,Kendlerstra√üe 40 (Umspannwerk),2850,vienna_station_2850_kendlerstrasse.csv,https://aqicn.org/station/@2850/


In [38]:
today = datetime.date.today()

earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 48.18980407714844¬∞N 16.377296447753906¬∞E
Elevation 194.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [39]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2506 entries, 0 to 2505
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         2506 non-null   datetime64[ns]
 1   temperature_2m_mean          2506 non-null   float32       
 2   precipitation_sum            2506 non-null   float32       
 3   wind_speed_10m_max           2506 non-null   float32       
 4   wind_direction_10m_dominant  2506 non-null   float32       
 5   city                         2506 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 78.4+ KB


### Define Data Validation Rules

In [40]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [41]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

### <span style="color:#ff5f27;"> üîÆ Connect to Hopsworks and save the sensors</span>

In [42]:
fs = project.get_feature_store()

#### Save country, city, street names as a secret

In [56]:
for s in stations_meta:
    uid = s["uid"]
    country = s["country"]
    city = s["city"]
    street = s["street"]
    url = s["url"]
    lat = s["lat"]
    lon = s["lon"]

    # Normalize the street name for the secret name
    safe_street = re.sub(r"[^a-zA-Z0-9]", "-", street.lower()).strip("-")

    # Build the secret name and payload
    secret_name = f"SENSOR_LOCATION_JSON_{safe_street}"
    dict_obj = {
        "station_uid": uid,
        "country": country,
        "city": city,
        "street": street,
        "url": url,
        "latitude": str(lat),
        "longitude": str(lon),
    }
    str_dict = json.dumps(dict_obj)

    # Replace existing secret if it already exists
    try:
        existing = secrets.get_secret(secret_name)
        if existing is not None:
            existing.delete()
            print(f"Replacing existing secret: {secret_name}")
    except Exception:
        pass  # Secret does not exist yet

    # Create the new secret
    secrets.create_secret(secret_name, str_dict)
    print(f"Created secret: {secret_name}")

Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_floridsdorf--gerichtsgasse-1a--prager-str--65m
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_schafbergbad--josef-redl-gasse-2--gstr-nr--698
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_wehlistra-e-366--gstr-nr-2157
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_ostringweg--zwischen-geb-uden-bt25
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_hausgrundweg-23--gstr--254
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
Created secret: SENSOR_LOCATION_JSON_umspannwerk-gaudenzdorfer-g-rtel
Secret created succ

### Create (or get) the shared Feature Group for Air Quality

In [48]:
aq_fg = fs.get_or_create_feature_group(
    name="stations_air_quality_daily",
    version=1,
    primary_key=["station_uid", "date"],
    event_time="date",
    description="Daily PM2.5 per station for Vienna",
    online_enabled=False,
)

In [50]:
df_all["date"] = pd.to_datetime(df_all["date"]).dt.normalize()

In [51]:
df_to_insert = df_all[["station_uid", "date", "pm25", "country", "city", "street", "url"]]

In [52]:
print(f"Inserting {len(df_to_insert)} rows into stations_air_quality_daily ...")
aq_fg.insert(df_to_insert, write_options={"wait_for_job": True})
print("Air quality data successfully inserted into Hopsworks!")

Inserting 27673 rows into air_quality_daily ...
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1289364/fs/1278019/fg/1668785


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 27673/27673 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: stations_air_quality_daily_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1289364/jobs/named/stations_air_quality_daily_1_offline_fg_materialization/executions
2025-11-12 18:23:28,910 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-12 18:23:32,080 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-12 18:25:53,066 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-12 18:25:53,231 INFO: Waiting for log aggregation to finish.
2025-11-12 18:26:12,080 INFO: Execution finished successfully.
Air quality data successfully inserted into Hopsworks!


### Enter a description for each Feature in the Feature Group

In [53]:
aq_fg.update_feature_description("date", "Date of measurement of air quality")
aq_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
aq_fg.update_feature_description("city", "City where the air quality was measured")
aq_fg.update_feature_description("street", "Street in the city where the air quality was measured")
aq_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
aq_fg.update_feature_description("station_uid", "ID of the station")
aq_fg.update_feature_description("url", "Station url")

<hsfs.feature_group.FeatureGroup at 0x169cdb430>

### Weather Data

In [None]:
# Get or create feature group
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [None]:
# Insert data
weather_fg.insert(weather_df, wait=True)

#### Enter a description for each feature in the Feature Group

In [None]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")