In [1]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
#settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: c:\Users\Abdul Rahman\Desktop\Air-Quality-App-team\mlfs-book


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [2]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> üåç Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [None]:
# project = hopsworks.login()
# fs = project.get_feature_store() 
# secrets = hopsworks.get_secrets_api()

# # This line will fail if you have not registered the AQICN_API_KEY as a secret in Hopsworks
# AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value
# location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
# location = json.loads(location_str)

# country=location['country']
# city=location['city']
# street=location['street']
# aqicn_url=location['aqicn_url']
# latitude=location['latitude']
# longitude=location['longitude']

# today = datetime.date.today()

# location_str

2025-11-05 17:15:04,107 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-05 17:15:04,111 INFO: Initializing external client
2025-11-05 17:15:04,111 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-05 17:15:05,624 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1278100


'{"country": "sweden", "city": "lund", "street": "bankgatan", "aqicn_url": "https://api.waqi.info/feed/A530461", "latitude": 55.74, "longitude": 13.18}'

In [3]:
import json, datetime, hopsworks

project = hopsworks.login(project="air_quality_prediction")
fs = project.get_feature_store()
secrets = hopsworks.get_secrets_api()

AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value

sensor_secret_names = [
    "SENSOR_LOCATION_bankgatan_JSON",
    "SENSOR_LOCATION_linakersvagen_JSON",
    "SENSOR_LOCATION_trollebergsvagen_JSON",
]

sensors = {}
for name in sensor_secret_names:
    raw = secrets.get_secret(name).value
    data = json.loads(raw)

    
    for k in ["country", "city", "street", "aqicn_url", "latitude", "longitude"]:
        assert k in data, f"{name}: missing key '{k}'"
    sensors[name.removeprefix("SENSOR_LOCATION_").removesuffix("_JSON")] = data


for sensor_name, info in sensors.items():
    print(json.dumps({sensor_name: info}, indent=2))


for sensor_name, info in sensors.items():
    print(
        f"{sensor_name}: "
        f"{info.get('city')} ‚Äì {info.get('street')} | "
        f"{info.get('latitude')} , {info.get('longitude')} | "
        f"{info.get('country')} | {info.get('aqicn_url')}"
    )


2025-11-13 14:03:31,831 INFO: Initializing external client
2025-11-13 14:03:31,832 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-13 14:03:36,058 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1271977
{
  "bankgatan": {
    "country": "sweden",
    "city": "lund",
    "street": "bankgatan",
    "aqicn_url": "https://api.waqi.info/feed/A530461",
    "latitude": 55.74,
    "longitude": 13.18
  }
}
{
  "linakersvagen": {
    "country": "sweden",
    "city": "lund",
    "street": "lin\u00e5kersv\u00e4gen",
    "aqicn_url": "https://api.waqi.info/feed/A415507",
    "latitude": 55.74,
    "longitude": 13.18
  }
}
{
  "trollebergsvagen": {
    "country": "sweden",
    "city": "lund",
    "street": "trollebergsv\u00e4gen",
    "aqicn_url": "https://api.waqi.info/feed/@10017",
    "latitude": 55.74,
    "longitude": 13.18
  }
}
bankgatan: lund ‚Äì bankgatan | 55.74 , 13.18 | sweden | https://api.waqi.info/feed/A530461
linakersvagen: lund ‚Äì lin√•kersv√§gen | 55.74 , 13.18 | sweden | https://api.waqi.info/feed/A415507
trollebergsvagen: lund ‚Äì trollebergsv

In [None]:
# print(json.dumps(json.loads(sensor_json), indent=2))

{
  "country": "sweden",
  "city": "lund",
  "street": "trollebergsv\u00e4gen",
  "aqicn_url": "https://api.waqi.info/feed/@10017",
  "latitude": 55.74,
  "longitude": 13.18
}


In [4]:
secrets = hopsworks.get_secrets_api()
# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")

### <span style="color:#ff5f27;"> üîÆ Get references to the Feature Groups </span>

In [5]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather',
    version=1,
)

---

## <span style='color:#ff5f27'> üå´ Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [6]:
import requests
import pandas as pd

# aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
# aq_today_df

In [7]:
import pandas as pd
import numpy as np
import unicodedata as ud

def nfc(s: str) -> str:
    return ud.normalize("NFC", s) if isinstance(s, str) else s

today = pd.Timestamp.today().normalize()

def compute_and_insert_today(sensor_key: str, s: dict) -> pd.DataFrame:

    s_country = nfc(str(s["country"]).strip().lower())
    s_city    = nfc(str(s["city"]).strip().lower())
    s_street  = nfc(str(s["street"]).strip())


    today_df = util.get_pm25(
        s["aqicn_url"], s_country, s_city, s_street, today.date(), AQICN_API_KEY
    )


    today_df["date"] = pd.to_datetime(today_df["date"]).dt.tz_localize(None)
    today_df["pm25"] = pd.to_numeric(today_df["pm25"], errors="coerce")


    today_df["country"] = today_df.get("country", s_country).map(nfc).str.lower()
    today_df["city"] = today_df.get("city",    s_city).map(nfc).str.lower()
    today_df["street"] = today_df.get("street",  s_street).map(nfc)
    today_df["url"] = today_df.get("url", s["aqicn_url"])


    try:
        hist = air_quality_fg.read()[["country", "city", "street", "date", "pm25"]].copy()
    except Exception as e:
        print(f"[warn] offline read failed, proceeding with empty hist: {e}")
        hist = pd.DataFrame(columns=["country","city","street","date","pm25"])

    hist["country"] = hist["country"].astype(str).map(nfc).str.lower()
    hist["city"] = hist["city"].astype(str).map(nfc).str.lower()
    hist["street"] = hist["street"].astype(str).map(nfc)
    hist["date"] = pd.to_datetime(hist["date"]).dt.tz_localize(None)

    hist = hist[
        (hist["country"] == s_country) &
        (hist["city"] == s_city) &
        (hist["street"] == s_street) &
        (hist["date"] < today)
    ].sort_values("date").tail(2)


    tmp = pd.concat(
        [hist[["date", "pm25"]], today_df[["date", "pm25"]]],
        ignore_index=True
    ).sort_values("date")
    tmp["pm25_3day_avg"] = tmp["pm25"].rolling(window=3, min_periods=3).mean()


    rm_today = tmp.loc[tmp["date"] == today_df["date"].iloc[0], "pm25_3day_avg"]
    today_df["pm25_3day_avg"] = rm_today.values[0] if not rm_today.empty else np.nan


    cols = ["date", "pm25", "country", "city", "street", "url", "pm25_3day_avg"]
    to_insert = today_df[cols].copy()


    for c in ["country", "city", "street", "url"]:
        to_insert[c] = to_insert[c].astype(str)
    to_insert["date"] = pd.to_datetime(to_insert["date"]).dt.tz_localize(None)
    to_insert["pm25"] = pd.to_numeric(to_insert["pm25"], errors="coerce").astype("float32")
    to_insert["pm25_3day_avg"] = pd.to_numeric(to_insert["pm25_3day_avg"], errors="coerce").astype("float32")


    air_quality_fg.insert(to_insert, wait=True)
    return to_insert


# Run for all sensors
aq_today_df_bankgatan = compute_and_insert_today("bankgatan", sensors["bankgatan"])
aq_today_df_linakersvagen = compute_and_insert_today("linakersvagen", sensors["linakersvagen"])
aq_today_df_trollebergsvagen = compute_and_insert_today("trollebergsvagen", sensors["trollebergsvagen"])


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.47s) 
2025-11-13 14:03:49,674 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271977/fs/1258579/fg/1637780


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271977/jobs/named/air_quality_1_offline_fg_materialization/executions
2025-11-13 14:04:06,444 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 14:04:09,669 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 14:05:50,924 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 14:05:51,085 INFO: Waiting for log aggregation to finish.
2025-11-13 14:05:59,778 INFO: Execution finished successfully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.74s) 
2025-11-13 14:06:03,356 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271977/fs/1258579/fg/16377

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271977/jobs/named/air_quality_1_offline_fg_materialization/executions
2025-11-13 14:06:19,624 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 14:06:22,871 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 14:08:00,643 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 14:08:00,815 INFO: Waiting for log aggregation to finish.
2025-11-13 14:08:09,619 INFO: Execution finished successfully.
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.41s) 
2025-11-13 14:08:12,665 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1271977/fs/1258579/fg/16377

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271977/jobs/named/air_quality_1_offline_fg_materialization/executions
2025-11-13 14:08:28,736 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 14:08:31,990 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 14:10:28,817 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 14:10:28,969 INFO: Waiting for log aggregation to finish.
2025-11-13 14:10:40,992 INFO: Execution finished successfully.


In [8]:
print(aq_today_df_bankgatan.head(), aq_today_df_linakersvagen.head(), aq_today_df_trollebergsvagen.head())

        date  pm25 country  city     street  \
0 2025-11-13  18.0  sweden  lund  bankgatan   

                                  url  pm25_3day_avg  
0  https://api.waqi.info/feed/A530461          11.46           date  pm25 country  city         street  \
0 2025-11-13   9.0  sweden  lund  lin√•kersv√§gen   

                                  url  pm25_3day_avg  
0  https://api.waqi.info/feed/A415507       4.433333           date  pm25 country  city            street  \
0 2025-11-13  35.0  sweden  lund  trollebergsv√§gen   

                                 url  pm25_3day_avg  
0  https://api.waqi.info/feed/@10017      41.666668  


In [9]:


df = air_quality_fg.read()
df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)
df["country"] = df["country"].astype(str).str.lower().map(nfc)
df["city"] = df["city"].astype(str).str.lower().map(nfc)
df["street"] = df["street"].astype(str).map(nfc)


df = df[["date", "country", "city", "street", "pm25", "pm25_3day_avg", "url"]]


def last_10_for_sensor(df_all: pd.DataFrame, sensor: dict) -> pd.DataFrame:
    m = (
        (df_all["country"] == sensor["country"]) &
        (df_all["city"]    == sensor["city"]) &
        (df_all["street"]  == sensor["street"])
    )
    d = df_all.loc[m].sort_values("date").tail(10).reset_index(drop=True)
    return d


last10_bankgatan = last_10_for_sensor(df, sensors["bankgatan"])
last10_linakersvagen = last_10_for_sensor(df, sensors["linakersvagen"])
last10_trollebergsvagen = last_10_for_sensor(df, sensors["trollebergsvagen"])


def tag(df_part: pd.DataFrame, tag: str) -> pd.DataFrame:
    out = df_part.copy()
    out["sensor_id"] = tag
    return out

last10_all = pd.concat([
    tag(last10_bankgatan, "bankgatan"),
    tag(last10_linakersvagen, "lin√•kersv√§gen"),   
    tag(last10_trollebergsvagen, "trollebergsv√§gen")
], ignore_index=True).sort_values(["sensor_id","date"]).reset_index(drop=True)


print("Rows per sensor (expect up to 10 each):")
for k, d in {
    "bankgatan": last10_bankgatan,
    "lin√•kersv√§gen": last10_linakersvagen,
    "trollebergsv√§gen": last10_trollebergsvagen
}.items():
    if d.empty:
        print(f"  {k}: 0  (no rows yet)")
    else:
        print(f"  {k}: {len(d)}  | range: {d['date'].min().date()} ‚Üí {d['date'].max().date()}")

print("\nSample (combined):")
print(last10_all.head(12).to_string(index=False))



Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.54s) 
Rows per sensor (expect up to 10 each):
  bankgatan: 10  | range: 2025-11-04 ‚Üí 2025-11-13
  lin√•kersv√§gen: 10  | range: 2025-11-04 ‚Üí 2025-11-13
  trollebergsv√§gen: 10  | range: 2025-11-04 ‚Üí 2025-11-13

Sample (combined):
      date country city        street  pm25  pm25_3day_avg                                url     sensor_id
2025-11-04  sweden lund     bankgatan  7.10       7.026667 https://api.waqi.info/feed/A530461     bankgatan
2025-11-05  sweden lund     bankgatan  2.22       5.483334 https://api.waqi.info/feed/A530461     bankgatan
2025-11-06  sweden lund     bankgatan  4.55       4.623333 https://api.waqi.info/feed/A530461     bankgatan
2025-11-07  sweden lund     bankgatan 16.00       7.590000 https://api.waqi.info/feed/A530461     bankgatan
2025-11-08  sweden lund     bankgatan 18.01      12.853333 https://api.waqi.info/feed/A530461     bankgatan
2025-11-09  sweden lund     bankgata

In [10]:
# DataFrames for inspection of air quality data from past 10 days:
#   last10_bankgatan
#   last10_linakersvagen
#   last10_trollebergsvagen
#   last10_all

last10_trollebergsvagen.head(10)

Unnamed: 0,date,country,city,street,pm25,pm25_3day_avg,url
0,2025-11-04,sweden,lund,trollebergsv√§gen,36.0,36.0,https://api.waqi.info/feed/@10017
1,2025-11-05,sweden,lund,trollebergsv√§gen,21.0,30.666666,https://api.waqi.info/feed/@10017
2,2025-11-06,sweden,lund,trollebergsv√§gen,33.0,30.0,https://api.waqi.info/feed/@10017
3,2025-11-07,sweden,lund,trollebergsv√§gen,71.0,41.666668,https://api.waqi.info/feed/@10017
4,2025-11-08,sweden,lund,trollebergsv√§gen,80.0,61.333332,https://api.waqi.info/feed/@10017
5,2025-11-09,sweden,lund,trollebergsv√§gen,67.0,72.666664,https://api.waqi.info/feed/@10017
6,2025-11-10,sweden,lund,trollebergsv√§gen,60.0,69.0,https://api.waqi.info/feed/@10017
7,2025-11-11,sweden,lund,trollebergsv√§gen,46.0,57.666668,https://api.waqi.info/feed/@10017
8,2025-11-12,sweden,lund,trollebergsv√§gen,44.0,50.0,https://api.waqi.info/feed/@10017
9,2025-11-13,sweden,lund,trollebergsv√§gen,35.0,41.666668,https://api.waqi.info/feed/@10017


In [11]:
aq_today_df_bankgatan.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   date           1 non-null      datetime64[ns]
 1   pm25           1 non-null      float32       
 2   country        1 non-null      object        
 3   city           1 non-null      object        
 4   street         1 non-null      object        
 5   url            1 non-null      object        
 6   pm25_3day_avg  1 non-null      float32       
dtypes: datetime64[ns](1), float32(2), object(4)
memory usage: 176.0+ bytes


## <span style='color:#ff5f27'> üå¶ Get Weather Forecast data</span>

In [14]:
from datetime import date

city = "lund"
latitude = 55.70584
longitude = 13.19321


hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
hourly_df = hourly_df.set_index("date")


daily_df = hourly_df.between_time("11:59", "12:01").reset_index()
daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.date
daily_df["date"] = pd.to_datetime(daily_df["date"])  
daily_df["city"] = city.lower()


cols = [
    "date",
    "temperature_2m_mean",
    "precipitation_sum",
    "wind_speed_10m_max",
    "wind_direction_10m_dominant",
    "city"
]
daily_df = daily_df[cols]


daily_df["city"] = daily_df["city"].astype(str).map(nfc).str.lower()
daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.tz_localize(None)

for c in ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"]:
    daily_df[c] = pd.to_numeric(daily_df[c], errors="coerce").astype("float32")


print("Prepared daily weather data:")
print(daily_df)


weather_fg.insert(daily_df, wait=True)
print(f"Weather data for {city.title()} on {date.today()} inserted successfully.")

Coordinates 55.75¬∞N 13.25¬∞E
Elevation 52.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Prepared daily weather data:
        date  temperature_2m_mean  precipitation_sum  wind_speed_10m_max  \
0 2025-11-13                12.05                0.0           19.615870   
1 2025-11-14                 7.55                0.0            2.811690   
2 2025-11-15                 3.50                0.0            9.255571   
3 2025-11-16                 5.55                0.0           15.379206   
4 2025-11-17                 3.80                0.0           17.654688   
5 2025-11-18                 5.05                0.0           14.618837   
6 2025-11-19                 3.85                0.0            4.843305   

   wind_direction_10m_dominant  city  
0                   227.231186  lund  
1                    39.805527  lund  
2                    76.504250  lund  
3                   249.443863  lund  
4                   320.792816  lund  
5                   232.0

Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 7/7 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1271977/jobs/named/weather_1_offline_fg_materialization/executions
2025-11-13 14:17:24,299 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 14:17:27,566 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 14:19:36,408 INFO: Waiting for log aggregation to finish.
2025-11-13 14:19:51,966 INFO: Execution finished successfully.
Weather data for Lund on 2025-11-13 inserted successfully.


In [15]:
daily_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         7 non-null      datetime64[ns]
 1   temperature_2m_mean          7 non-null      float32       
 2   precipitation_sum            7 non-null      float32       
 3   wind_speed_10m_max           7 non-null      float32       
 4   wind_direction_10m_dominant  7 non-null      float32       
 5   city                         7 non-null      object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 352.0+ bytes


In [16]:


df = weather_fg.read()


df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)
df["city"] = df["city"].astype(str).str.lower().map(nfc)


cols = [
    "date",
    "city",
    "temperature_2m_mean",
    "precipitation_sum",
    "wind_speed_10m_max",
    "wind_direction_10m_dominant",
]
df = df[cols]


df_city = (
    df.loc[df["city"] == city.lower()]
      .sort_values("date")
      .tail(10)
      .reset_index(drop=True)
)

print(f"Last {len(df_city)} weather rows for '{city}': {df_city['date'].min().date()} ‚Üí {df_city['date'].max().date()}")
print(df_city.to_string(index=False))


try:
    display(df_city)
except Exception:
    pass

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.58s) 
Last 10 weather rows for 'lund': 2025-11-10 ‚Üí 2025-11-19
      date city  temperature_2m_mean  precipitation_sum  wind_speed_10m_max  wind_direction_10m_dominant
2025-11-10 lund             6.749416                0.0            4.896530                   104.204674
2025-11-11 lund             7.855667                0.9           13.306615                   179.409348
2025-11-12 lund             9.499417                0.7           25.925623                   191.821457
2025-11-13 lund            12.050000                0.0           19.615870                   227.231186
2025-11-14 lund             7.550000                0.0            2.811690                    39.805527
2025-11-15 lund             3.500000                0.0            9.255571                    76.504250
2025-11-16 lund             5.550000                0.0           15.379206                   249.443863
2025-11-17 lund

Unnamed: 0,date,city,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant
0,2025-11-10,lund,6.749416,0.0,4.89653,104.204674
1,2025-11-11,lund,7.855667,0.9,13.306615,179.409348
2,2025-11-12,lund,9.499417,0.7,25.925623,191.821457
3,2025-11-13,lund,12.05,0.0,19.61587,227.231186
4,2025-11-14,lund,7.55,0.0,2.81169,39.805527
5,2025-11-15,lund,3.5,0.0,9.255571,76.50425
6,2025-11-16,lund,5.55,0.0,15.379206,249.443863
7,2025-11-17,lund,3.8,0.0,17.654688,320.792816
8,2025-11-18,lund,5.05,0.0,14.618837,232.001205
9,2025-11-19,lund,3.85,0.0,4.843305,131.987137


## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
