In [1]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/kcah/Documents/code-repo/air-quality-prediction
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [2]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
from mlfs import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> üåç Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [3]:
project = hopsworks.login()
fs = project.get_feature_store() 
secrets = hopsworks.get_secrets_api()

# This line will fail if you have not registered the AQICN_API_KEY as a secret in Hopsworks
AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value
location_str = secrets.get_secret("SENSOR_LOCATION_HELSINKI_JSON").value
locations = json.loads(location_str)

# Handle both single location (dict) and multiple locations (list)
if isinstance(locations, dict):
    # Single location - convert to list for consistent processing
    locations = [locations]
elif isinstance(locations, list):
    # Already a list
    pass
else:
    raise ValueError("SENSOR_LOCATION_HELSINKI_JSON must be either a JSON object or a JSON array")

print(f"Found {len(locations)} location(s) to process:")
for i, loc in enumerate(locations, 1):
    print(f"  {i}. {loc.get('street', 'unknown')} - {loc.get('city', 'unknown')}")

today = datetime.datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)

2025-11-18 11:51:14,751 INFO: Initializing external client
2025-11-18 11:51:14,751 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-18 11:51:16,275 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279136
Found 4 location(s) to process:
  1. kluuvi - helsinki
  2. kallio-2 - helsinki
  3. mannerheimintie - helsinki
  4. vartiokyla-huivipolku - helsinki


### <span style="color:#ff5f27;"> üîÆ Get references to the Feature Groups </span>

In [4]:
# Retrieve feature groups
air_quality_fg = fs.get_feature_group(
    name='air_quality_helsinki',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_helsinki',
    version=1,
)

---

## <span style='color:#ff5f27'> üå´ Retrieve Today's Air Quality data (PM2.5) from the AQI API</span>


In [5]:
import requests
import pandas as pd
from datetime import timedelta
from datetime import timezone

# Process each location
all_aq_data = []  # Store all air quality dataframes
weather_processed = False  # Weather data only needs to be processed once (same for all locations)

for location_idx, location in enumerate(locations, 1):
    print(f"\n{'='*60}")
    print(f"Processing location {location_idx}/{len(locations)}: {location['street']}")
    print(f"{'='*60}")
    
    # Extract location variables
    country = location['country']
    city = location['city']
    street = location['street']
    aqicn_url = location['aqicn_url']
    latitude = location['latitude']
    longitude = location['longitude']
    
    # Retrieve today's air quality data
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
    print(f"Retrieved air quality data for {street}")
    
    # Add lagged features
    # Get dates for 1, 2, and 3 days ago, set to midnight and normalize to UTC
    today_ts = pd.Timestamp.now(tz='UTC')
    date_1d_ago = (today_ts - timedelta(days=1)).astimezone(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
    date_2d_ago = (today_ts - timedelta(days=2)).astimezone(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
    date_3d_ago = (today_ts - timedelta(days=3)).astimezone(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
    date_4d_ago = (today_ts - timedelta(days=4)).astimezone(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
    
    # Initialize lagged features as None
    aq_today_df['pm25_lag_1d'] = None
    aq_today_df['pm25_lag_2d'] = None
    aq_today_df['pm25_lag_3d'] = None
    
    # Try to retrieve previous days' data from the feature store
    try:
        # Query for previous days' pm25 values
        historical_data = air_quality_fg.filter(
            (air_quality_fg.country == country) &
            (air_quality_fg.city == city) &
            (air_quality_fg.street == street) &
            (air_quality_fg.date >= date_4d_ago) &
            (air_quality_fg.date < today_ts)
        ).read()
        
        # Select only the columns we need
        historical_data = historical_data[['date', 'pm25']]
        
        # Sort by date
        historical_data = historical_data.sort_values('date')
        
        # Normalize historical_data dates to UTC to match comparison timestamps
        if historical_data['date'].dt.tz is not None:
            historical_data['date'] = historical_data['date'].dt.tz_convert('UTC')
        
        # Get the pm25 values for 1, 2, and 3 days ago
        for idx, row in aq_today_df.iterrows():
            # Convert dates to pandas Timestamps and normalize to UTC for comparison
            ts_1d_ago = pd.Timestamp(date_1d_ago).tz_convert('UTC')
            ts_2d_ago = pd.Timestamp(date_2d_ago).tz_convert('UTC')
            ts_3d_ago = pd.Timestamp(date_3d_ago).tz_convert('UTC')
            
            # Get pm25 for 1 day ago
            lag_1d = historical_data[historical_data['date'] == ts_1d_ago]
            if not lag_1d.empty:
                aq_today_df.at[idx, 'pm25_lag_1d'] = lag_1d['pm25'].iloc[0]
            
            # Get pm25 for 2 days ago
            lag_2d = historical_data[historical_data['date'] == ts_2d_ago]
            if not lag_2d.empty:
                aq_today_df.at[idx, 'pm25_lag_2d'] = lag_2d['pm25'].iloc[0]
            
            lag_3d = historical_data[historical_data['date'] == date_3d_ago]
            if not lag_3d.empty:
                aq_today_df.at[idx, 'pm25_lag_3d'] = lag_3d['pm25'].iloc[0]
    
    except Exception as e:
        print(f"Warning: Could not retrieve lagged features for {street}: {e}")
        print("Lagged features will be set to None for this day")
    
    # Convert lagged features to float32, handling None values
    aq_today_df['pm25_lag_1d'] = pd.to_numeric(aq_today_df['pm25_lag_1d'], errors='coerce').astype('float32')
    aq_today_df['pm25_lag_2d'] = pd.to_numeric(aq_today_df['pm25_lag_2d'], errors='coerce').astype('float32')
    aq_today_df['pm25_lag_3d'] = pd.to_numeric(aq_today_df['pm25_lag_3d'], errors='coerce').astype('float32')
    
    # Store for later insertion
    all_aq_data.append(aq_today_df)
    
    # Insert air quality data for this location
    print(f"Inserting air quality data for {street}...")
    air_quality_fg.insert(aq_today_df)
    print(f"‚úì Successfully inserted air quality data for {street}")

# Combine all air quality dataframes for display
if all_aq_data:
    combined_aq_df = pd.concat(all_aq_data, ignore_index=True)
    print(f"\n{'='*60}")
    print(f"Summary: Processed {len(all_aq_data)} location(s)")
    print(f"{'='*60}")
    combined_aq_df


Processing location 1/4: kluuvi
Retrieved air quality data for kluuvi
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.73s) 
Inserting air quality data for kluuvi...
2025-11-18 11:51:24,331 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279136/fs/1265746/fg/1718962


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_helsinki_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279136/jobs/named/air_quality_helsinki_1_offline_fg_materialization/executions
‚úì Successfully inserted air quality data for kluuvi

Processing location 2/4: kallio-2
Retrieved air quality data for kallio-2
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.65s) 
Inserting air quality data for kallio-2...
2025-11-18 11:51:41,466 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279136/fs/1265746/fg/1718962


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


‚úì Successfully inserted air quality data for kallio-2

Processing location 3/4: mannerheimintie
Retrieved air quality data for mannerheimintie
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.69s) 
Inserting air quality data for mannerheimintie...
2025-11-18 11:51:52,065 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279136/fs/1265746/fg/1718962


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


‚úì Successfully inserted air quality data for mannerheimintie

Processing location 4/4: vartiokyla-huivipolku
Retrieved air quality data for vartiokyla-huivipolku
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.75s) 
Inserting air quality data for vartiokyla-huivipolku...
2025-11-18 11:52:03,175 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279136/fs/1265746/fg/1718962


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1/1 | Elapsed Time: 00:01 | Remaining Time: 00:00


‚úì Successfully inserted air quality data for vartiokyla-huivipolku

Summary: Processed 4 location(s)


In [6]:
# Display info for the combined air quality dataframe
if 'combined_aq_df' in locals():
    combined_aq_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   pm25         4 non-null      float32       
 1   country      4 non-null      object        
 2   city         4 non-null      object        
 3   street       4 non-null      object        
 4   date         4 non-null      datetime64[us]
 5   url          4 non-null      object        
 6   pm25_lag_1d  4 non-null      float32       
 7   pm25_lag_2d  4 non-null      float32       
 8   pm25_lag_3d  4 non-null      float32       
dtypes: datetime64[us](1), float32(4), object(4)
memory usage: 356.0+ bytes


## Add Lagged Air Quality Features

We will retrieve the previous 1, 2, and 3 days of air quality data from the feature store and add them as lagged features.


In [7]:
# Select today's date for each street with all columns in combined_aq_df
if 'combined_aq_df' in locals():
    today = pd.Timestamp.today().normalize()
    # Assume 'date' is either datetime64[ns] or string in YYYY-MM-DD format, coerce to date for comparison
    filtered_today_df = combined_aq_df[
        pd.to_datetime(combined_aq_df['date']).dt.normalize() == today
    ]
    print(filtered_today_df)

   pm25  country      city                 street       date  \
0  21.0  finland  helsinki                 kluuvi 2025-11-18   
1   5.0  finland  helsinki               kallio-2 2025-11-18   
2  21.0  finland  helsinki        mannerheimintie 2025-11-18   
3   5.0  finland  helsinki  vartiokyla-huivipolku 2025-11-18   

                                url  pm25_lag_1d  pm25_lag_2d  pm25_lag_3d  
0  https://api.waqi.info/feed/@5717          5.0          9.0         13.0  
1  https://api.waqi.info/feed/@4908         13.0          8.0          7.0  
2  https://api.waqi.info/feed/@4909         16.0          9.0         13.0  
3  https://api.waqi.info/feed/@4910         10.0          8.0          7.0  


In [None]:
# Sanity check
makelankatu_aq_df = air_quality_fg.filter(
    air_quality_fg.street == "kluuvi"
).read().sort_values('date', ascending=False)
makelankatu_aq_df.head(10)


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.10s) 


Unnamed: 0,date,pm25,street,url,country,city,pm25_lag_1d,pm25_lag_2d,pm25_lag_3d
1881,2025-11-17 00:00:00+00:00,5.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,9.0,13.0,13.0
3087,2025-11-16 00:00:00+00:00,9.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,13.0,13.0,18.0
2724,2025-11-15 00:00:00+00:00,13.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,13.0,18.0,28.0
1609,2025-11-14 00:00:00+00:00,13.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,18.0,28.0,30.0
387,2025-11-13 00:00:00+00:00,18.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,28.0,30.0,17.0
1031,2025-11-12 00:00:00+00:00,28.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,30.0,17.0,38.0
2520,2025-11-11 00:00:00+00:00,30.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,17.0,38.0,45.0
2460,2025-11-10 00:00:00+00:00,17.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,38.0,45.0,28.0
3112,2025-11-09 00:00:00+00:00,38.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,45.0,28.0,32.0
4152,2025-11-08 00:00:00+00:00,45.0,kluuvi,https://api.waqi.info/feed/@5717,finland,helsinki,28.0,32.0,21.0


## <span style='color:#ff5f27'> üå¶ Get Weather Forecast data</span>

In [9]:
# Weather data is the same for all locations in the same city, so process it only once
# Use the first location's city and coordinates (all locations should have the same city)
if locations:
    first_location = locations[0]
    city = first_location['city']
    latitude = first_location['latitude']
    longitude = first_location['longitude']
    
    print(f"Processing weather data for {city} (latitude: {latitude}, longitude: {longitude})")
    
    start_date = pd.Timestamp.today().normalize()
    end_date = start_date + pd.Timedelta(days=10)
    # Convert dates to 'YYYY-MM-DD' format strings
    start_date = start_date.strftime('%Y-%m-%d')
    end_date = end_date.strftime('%Y-%m-%d')
    hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude, start_date, end_date)
    hourly_df = hourly_df.set_index('date')
    
    # We will only make 1 daily prediction, so we will replace the hourly forecasts with a single daily forecast
    # We only want the daily weather data, so only get weather at 12:00
    daily_df = hourly_df.between_time('11:59', '12:01')
    daily_df = daily_df.reset_index()
    daily_df['date'] = pd.to_datetime(daily_df['date']).dt.date
    daily_df['date'] = pd.to_datetime(daily_df['date'])
    daily_df['city'] = city
    
    print(f"‚úì Weather data processed for {city}")
    daily_df
else:
    print("No locations found. Cannot process weather data.")

Processing weather data for helsinki (latitude: 60.1733244, longitude: 24.9410248)
Coordinates 60.25¬∞N 25.0¬∞E
Elevation 2.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
‚úì Weather data processed for helsinki


In [10]:
hourly_df

Unnamed: 0_level_0,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,rain_sum
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2025-11-18 00:00:00,-0.20,0.0,9.565437,340.201019,0.0
2025-11-18 01:00:00,-0.00,0.0,9.832680,336.250488,0.0
2025-11-18 02:00:00,0.15,0.0,10.315115,330.751282,0.0
2025-11-18 03:00:00,0.10,0.0,10.630672,331.699341,0.0
2025-11-18 04:00:00,-0.25,0.0,10.931203,342.758453,0.0
...,...,...,...,...,...
2025-11-28 19:00:00,-5.10,0.0,20.268990,56.592163,0.0
2025-11-28 20:00:00,-5.15,0.0,21.267441,56.040897,0.0
2025-11-28 21:00:00,-5.10,0.0,21.971800,55.007900,0.0
2025-11-28 22:00:00,-5.05,0.0,22.682856,54.039394,0.0


In [11]:
daily_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 11 entries, 0 to 10
Data columns (total 7 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         11 non-null     datetime64[ns]
 1   temperature_2m_mean          11 non-null     float32       
 2   precipitation_sum            11 non-null     float32       
 3   wind_speed_10m_max           11 non-null     float32       
 4   wind_direction_10m_dominant  11 non-null     float32       
 5   rain_sum                     11 non-null     float32       
 6   city                         11 non-null     object        
dtypes: datetime64[ns](1), float32(5), object(1)
memory usage: 528.0+ bytes


## <span style="color:#ff5f27;">‚¨ÜÔ∏è Uploading new data to the Feature Store</span>

In [12]:
# Note: Air quality data insertion is now handled within the main loop in cell 11 above
# This cell is kept for reference but is no longer needed

In [13]:
# Insert weather data (only needs to be done once since all locations share the same city)
if 'daily_df' in locals() and daily_df is not None:
    print("Inserting weather data...")
    weather_fg.insert(daily_df, wait=True)
    print("‚úì Successfully inserted weather data")
else:
    print("Warning: daily_df not found. Weather data was not inserted.")

Inserting weather data...
2025-11-18 11:57:03,073 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279136/fs/1265746/fg/1721946


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 11/11 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_helsinki_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279136/jobs/named/weather_helsinki_1_offline_fg_materialization/executions
2025-11-18 11:57:20,180 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-18 11:57:26,503 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-18 11:58:42,828 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-18 11:58:42,978 INFO: Waiting for log aggregation to finish.
2025-11-18 11:58:51,547 INFO: Execution finished successfully.
‚úì Successfully inserted weather data


## <span style="color:#ff5f27;">‚è≠Ô∏è **Next:** Part 03: Training Pipeline
 </span> 

In the following notebook you will read from a feature group and create training dataset within the feature store
