In [1]:
import sys
from pathlib import Path
import os

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
if os.path.exists(f"{root_dir}/.env"):
    settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: d:\KTH\Scalable Machine Learning and Deep Learning\LAB\mlfs-book
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> üìù Imports

In [3]:
# ========== Cell 1: ÂØºÂÖ•ÂíåËøûÊé• ==========
import hopsworks
import pandas as pd
from datetime import datetime, timedelta
import sys
sys.path.append('../../mlfs')
from mlfs.airquality import util
import json

project = hopsworks.login()
fs = project.get_feature_store()
secrets = hopsworks.get_secrets_api()

# Ëé∑ÂèñAPI key
AQICN_API_KEY = secrets.get_secret("AQICN_API_KEY").value

2025-11-13 15:35:19,557 INFO: Initializing external client
2025-11-13 15:35:19,558 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2025-11-13 15:35:26,301 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1298583


In [4]:
# ========== Cell 2: ËØªÂèñ‰º†ÊÑüÂô®ÈÖçÁΩÆ ==========
location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
location = json.loads(location_str)

country = location['country']
city = location['city']
latitude = location['latitude']
longitude = location['longitude']
sensors = location['sensors']  # ÊâÄÊúâ5‰∏™‰º†ÊÑüÂô®

print(f"City: {city}")
print(f"Found {len(sensors)} sensors:")
for s in sensors:
    print(f"  - {s['street']}")

today = datetime.now().date()
yesterday = today - timedelta(days=1)

City: tampere
Found 5 sensors:
  - tampere
  - kaleva
  - pirkankatu
  - epila-2
  - linja-autoasema


In [14]:
# ========== Cell 3: Ëé∑ÂèñÂ§©Ê∞îÊï∞ÊçÆ ==========
print(f"\nFetching weather for {city}...")

# Ëé∑ÂèñhourlyÂ§©Ê∞îÈ¢ÑÊµã
hourly_df = util.get_hourly_weather_forecast(city, latitude, longitude)
hourly_df = hourly_df.set_index('date')

# Âè™ÂèñÊØèÂ§©12ÁÇπÁöÑÊï∞ÊçÆ‰Ωú‰∏∫daily forecast
daily_df = hourly_df.between_time('11:59', '12:01')
daily_df = daily_df.reset_index()
daily_df['date'] = pd.to_datetime(daily_df['date']).dt.date
daily_df['date'] = pd.to_datetime(daily_df['date'])
daily_df['city'] = city

print(f"Weather data: {len(daily_df)} days")
daily_df.head()

weather_df = daily_df  # ÈáçÂëΩÂêç‰∏∫weather_dfÔºåÂêéÈù¢Áªü‰∏Ä‰ΩøÁî®


Fetching weather for tampere...
Coordinates 61.5¬∞N 23.75¬∞E
Elevation 102.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Weather data: 7 days


In [15]:
# ========== Cell 4: Ëé∑ÂèñFeature Groups ==========
air_quality_fg = fs.get_feature_group(name='air_quality_sensors', version=1)
weather_fg = fs.get_feature_group(name='weather_sensors', version=1)

In [16]:
# ========== Cell 5: Âæ™ÁéØËé∑ÂèñÊâÄÊúâ‰º†ÊÑüÂô®ÁöÑAir Quality ==========
all_aq_data = []

for sensor in sensors:
    street = sensor['street']
    aqicn_url = sensor['api_url']
    
    print(f"\nProcessing {street}...")
    
    try:
        # Ëé∑ÂèñÊò®Â§©ÁöÑPM2.5Êï∞ÊçÆ
        aq_today = util.get_pm25(
            aqicn_url, country, city, street, today, AQICN_API_KEY  # Áî®today
        )
        
        # ========== Ê∑ªÂä†lag features ==========
        # ‰ªéFeature GroupËØªÂèñËØ•‰º†ÊÑüÂô®ÊúÄËøë3Â§©ÁöÑÊï∞ÊçÆ
        historical_df = air_quality_fg.read()
        sensor_history = historical_df[
            (historical_df['city'] == city) & 
            (historical_df['street'] == street)
        ].sort_values('date').tail(3)
        
        if len(sensor_history) >= 3:
            aq_today['pm25_lag_1'] = sensor_history.iloc[-1]['pm25']
            aq_today['pm25_lag_2'] = sensor_history.iloc[-2]['pm25']
            aq_today['pm25_lag_3'] = sensor_history.iloc[-3]['pm25']
        elif len(sensor_history) > 0:
            # ‰∏çË∂≥3Â§©ÔºåÁî®ÊúÄÂêé‰∏ÄÂ§©
            last_pm25 = sensor_history.iloc[-1]['pm25']
            aq_today['pm25_lag_1'] = last_pm25
            aq_today['pm25_lag_2'] = last_pm25
            aq_today['pm25_lag_3'] = last_pm25
        else:
            # È¶ñÊ¨°ËøêË°å
            aq_today['pm25_lag_1'] = aq_today['pm25'].values[0]
            aq_today['pm25_lag_2'] = aq_today['pm25'].values[0]
            aq_today['pm25_lag_3'] = aq_today['pm25'].values[0]
        
        all_aq_data.append(aq_today)
        print(f"  ‚úÖ {street}: PM2.5={aq_today['pm25'].values[0]}")
        
    except Exception as e:
        print(f"  ‚ùå {street}: Error - {e}")
        continue

# ÂêàÂπ∂ÊâÄÊúâ‰º†ÊÑüÂô®Êï∞ÊçÆ
if len(all_aq_data) > 0:
    aq_df_all = pd.concat(all_aq_data, ignore_index=True)
    print(f"\n‚úÖ Collected data from {len(all_aq_data)} sensors")
else:
    print("\n‚ùå No air quality data collected!")
    sys.exit(1)

aq_df_all


Processing tampere...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.31s) 
  ‚úÖ tampere: PM2.5=4.0

Processing kaleva...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.71s) 
  ‚úÖ kaleva: PM2.5=2.0

Processing pirkankatu...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.57s) 
  ‚úÖ pirkankatu: PM2.5=3.0

Processing epila-2...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.82s) 
  ‚úÖ epila-2: PM2.5=12.0

Processing linja-autoasema...
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.72s) 
  ‚úÖ linja-autoasema: PM2.5=7.0

‚úÖ Collected data from 5 sensors


Unnamed: 0,pm25,country,city,street,date,url,pm25_lag_1,pm25_lag_2,pm25_lag_3
0,4.0,Finland,tampere,tampere,2025-11-13,https://api.waqi.info/feed/@5719/,4.0,13.0,15.0
1,2.0,Finland,tampere,kaleva,2025-11-13,https://api.waqi.info/feed/@4919/,2.0,11.0,8.0
2,3.0,Finland,tampere,pirkankatu,2025-11-13,https://api.waqi.info/feed/@4921/,4.0,3.0,11.0
3,12.0,Finland,tampere,epila-2,2025-11-13,https://api.waqi.info/feed/@4918/,1.0,12.0,8.0
4,7.0,Finland,tampere,linja-autoasema,2025-11-13,https://api.waqi.info/feed/@4920/,2.0,7.0,10.0


In [17]:
# ========== Cell 6: ÊèíÂÖ•Feature Groups ==========
# ÊèíÂÖ•Air Quality
air_quality_fg.insert(aq_df_all, write_options={"wait_for_job": True})
print("‚úÖ Air quality data inserted")

# ÊèíÂÖ•Weather
weather_fg.insert(weather_df, write_options={"wait_for_job": True})
print("‚úÖ Weather data inserted")

print("\nüéâ Daily pipeline completed!")

2025-11-13 15:53:38,993 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1298583/fs/1286215/fg/1703378


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 5/5 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: air_quality_sensors_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1298583/jobs/named/air_quality_sensors_1_offline_fg_materialization/executions
2025-11-13 15:53:55,353 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 15:53:58,546 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 15:55:46,979 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 15:55:47,150 INFO: Waiting for log aggregation to finish.
2025-11-13 15:56:02,512 INFO: Execution finished successfully.
‚úÖ Air quality data inserted
2025-11-13 15:56:02,724 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1298583/fs/1286215/fg/1703380


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 7/7 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_sensors_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1298583/jobs/named/weather_sensors_1_offline_fg_materialization/executions
2025-11-13 15:56:18,735 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 15:56:21,907 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-13 15:57:54,260 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 15:57:54,431 INFO: Waiting for log aggregation to finish.
2025-11-13 15:58:03,040 INFO: Execution finished successfully.
‚úÖ Weather data inserted

üéâ Daily pipeline completed!
