In [1]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: /home/federica_lorenzini/ml-lab1/mlfs-book
Added the following directory to the PYTHONPATH: /home/federica_lorenzini/ml-lab1/mlfs-book
HopsworksSettings initialized!


### <span style='color:#ff5f27'>Imports

In [2]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

### Login into Hopsworks


In [3]:
project = hopsworks.login()

2025-11-18 11:02:31,804 INFO: Initializing external client
2025-11-18 11:02:31,806 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-18 11:02:33,341 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279175


## <span style='color:#ff5f27'> Get the AQICN_URL and API key. Enter country, city, street names for your Sensor.</span>

In [4]:
today = datetime.date.today()

# taken from ~/.env.
if settings.AQICN_API_KEY is None:
    print("You need to set AQICN_API_KEY either in this cell or in ~/.env")
    sys.exit(1)

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value() 

print(f"Found AQICN_API_KEY: {AQICN_API_KEY}")

secrets = hopsworks.get_secrets_api()

# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")
if secret is not None:
    secret.delete()
    print("Replacing existing AQICN_API_KEY")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

Found AQICN_API_KEY: b7b6ef9c10e47611f567a3080c006412df782712
Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('AQICN_API_KEY', 'PRIVATE')

In [5]:
# Berlin Coordinates
city = 'Berlin'
latitude = 52.52
longitude = 13.40

### Validate that the AQICN_API_KEY works

In [6]:
try:
    aq_today_df = util.get_pm25('https://api.waqi.info/feed/@6132', 'Germany', 'Berlin', 'karl-liebknecht-strasse', today, AQICN_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_df.head()

Unnamed: 0,pm25,country,city,street,date,url
0,30.0,Germany,Berlin,karl-liebknecht-strasse,2025-11-18,https://api.waqi.info/feed/@6132


## <span style='color:#ff5f27'> Read your CSV file into a DataFrame </span>

### Extract sensor list

In [7]:
csv_file=f"{root_dir}/data/berlin_air_quality/all_berlin_sensors.csv"
sensors_df = pd.read_csv(csv_file, skipinitialspace=True)
sensors_df

Unnamed: 0,url,file_name,country,city,street
0,https://api.waqi.info/feed/@6132,berlin.csv,Germany,Berlin,karl-liebknecht-strasse
1,https://api.waqi.info/feed/@10033,buch.csv,Germany,Berlin,buch
2,https://api.waqi.info/feed/@10034,friedrichshagen.csv,Germany,Berlin,friedrichshagen
3,https://api.waqi.info/feed/@13851,karl-marx.csv,Germany,Berlin,karl-marx-strasse
4,https://api.waqi.info/feed/@13852,leipziger.csv,Germany,Berlin,leipziger-strasse
5,https://api.waqi.info/feed/@10040,mariendorf.csv,Germany,Berlin,mariendorf--mariendorfer-damm
6,https://api.waqi.info/feed/@10039,mitte.csv,Germany,Berlin,mitte--bruckenstrasse
7,https://api.waqi.info/feed/@10032,neukolln-nansen.csv,Germany,Berlin,neukolln-nansenstrasse
8,https://api.waqi.info/feed/@10036,neukolln-silbersteinstr.csv,Germany,Berlin,neukolln-silbersteinstrasse
9,https://api.waqi.info/feed/@10030,wedding.csv,Germany,Berlin,wedding-amrumer-strasse


### Read all sensors

In [8]:
df_aq = pd.DataFrame()

for i in range(sensors_df.shape[0]):
    file_name = sensors_df.iloc[i]['file_name']
    df_i = pd.read_csv(f"{root_dir}/data/berlin_air_quality/{file_name}",  parse_dates=['date'], skipinitialspace=True)
    
    # Data cleaning and check the data types for the columns in your DataFrame
    df_i = df_i[['date', 'pm25']]
    df_i['pm25'] = df_i['pm25'].astype('float32')

    # Adding lagged features
    lagged_days = 3

    df_i_lagged = pd.DataFrame()
    for j in range(1,lagged_days+1):
        df_i_lagged['date'] = df_i['date'] + datetime.timedelta(days=j)
        df_i_lagged['pm25'] = df_i['pm25']
        suffix='_lagged_' + str(j)
        df_i = df_i.merge(df_i_lagged, on='date', how='left', suffixes=['',suffix])

    # Drop any rows with missing data
    df_i.dropna(inplace=True)

    # Add country, city, street, url to the DataFrame
    df_i['country'] = sensors_df.iloc[i]['country']
    df_i['city'] = sensors_df.iloc[i]['city']
    df_i['street'] = sensors_df.iloc[i]['street']
    df_i['url'] = sensors_df.iloc[i]['url']

    df_aq = pd.concat([df_aq, df_i], ignore_index=True)

df_aq

Unnamed: 0,date,pm25,pm25_lagged_1,pm25_lagged_2,pm25_lagged_3,country,city,street,url
0,2025-11-01,57.0,50.0,30.0,38.0,Germany,Berlin,karl-liebknecht-strasse,https://api.waqi.info/feed/@6132
1,2025-11-02,59.0,57.0,50.0,30.0,Germany,Berlin,karl-liebknecht-strasse,https://api.waqi.info/feed/@6132
2,2025-11-03,45.0,59.0,57.0,50.0,Germany,Berlin,karl-liebknecht-strasse,https://api.waqi.info/feed/@6132
3,2025-11-04,46.0,45.0,59.0,57.0,Germany,Berlin,karl-liebknecht-strasse,https://api.waqi.info/feed/@6132
4,2025-11-05,53.0,46.0,45.0,59.0,Germany,Berlin,karl-liebknecht-strasse,https://api.waqi.info/feed/@6132
...,...,...,...,...,...,...,...,...,...
12909,2022-03-27,55.0,73.0,88.0,83.0,Germany,Berlin,wedding-amrumer-strasse,https://api.waqi.info/feed/@10030
12910,2022-03-28,49.0,55.0,73.0,88.0,Germany,Berlin,wedding-amrumer-strasse,https://api.waqi.info/feed/@10030
12911,2022-03-29,52.0,49.0,55.0,73.0,Germany,Berlin,wedding-amrumer-strasse,https://api.waqi.info/feed/@10030
12912,2022-03-30,37.0,52.0,49.0,55.0,Germany,Berlin,wedding-amrumer-strasse,https://api.waqi.info/feed/@10030


In [9]:
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12914 entries, 0 to 12913
Data columns (total 9 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   date           12914 non-null  datetime64[ns]
 1   pm25           12914 non-null  float32       
 2   pm25_lagged_1  12914 non-null  float32       
 3   pm25_lagged_2  12914 non-null  float32       
 4   pm25_lagged_3  12914 non-null  float32       
 5   country        12914 non-null  object        
 6   city           12914 non-null  object        
 7   street         12914 non-null  object        
 8   url            12914 non-null  object        
dtypes: datetime64[ns](1), float32(4), object(4)
memory usage: 706.4+ KB


## <span style='color:#ff5f27'> Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

### Download the Historical Weather Data

https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

We will download the historical weather data for your `city` by first extracting the earliest date from your DataFrame containing the historical air quality measurements.

We will download all daily historical weather data measurements for your `city` from the earliest date in your air quality measurement DataFrame. It doesn't matter if there are missing days of air quality measurements. We can store all of the daily weather measurements, and when we build our training dataset, we will join up the air quality measurements for a given day to its weather features for that day. 

The weather features we will download are:

 * `temperature (average over the day)`
 * `precipitation (the total over the day)`
 * `wind speed (average over the day)`
 * `wind direction (the most dominant direction over the day)`


In [10]:
earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 52.5483283996582°N 13.407821655273438°E
Elevation 30.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [11]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1674 entries, 0 to 1673
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         1674 non-null   datetime64[ns]
 1   temperature_2m_mean          1674 non-null   float32       
 2   precipitation_sum            1674 non-null   float32       
 3   wind_speed_10m_max           1674 non-null   float32       
 4   wind_direction_10m_dominant  1674 non-null   float32       
 5   city                         1674 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 52.4+ KB


## <span style='color:#ff5f27'> Define Data Validation Rules </span>

### Expectations for Weather Data
We will validate the air quality measurements (`pm25` values) before we write them to Hopsworks.

We define a data validation rule (an expectation in Great Expectations) that ensures that `pm25` values are not negative or above the max value available by the sensor.

We will attach this expectation to the air quality feature group, so that we validate the `pm25` data every time we write a DataFrame to the feature group. We want to prevent garbage-in, garbage-out.


In [12]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

### Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [13]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---

### <span style="color:#ff5f27;"> Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [14]:
fs = project.get_feature_store() 

#### Save country, city as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline

In [15]:
dict_obj = {
    "country": 'Germany',
    "city": city,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

# Replace any existing secret with the new value
secret = secrets.get_secret("SENSOR_LOCATION_BERLIN_JSON")
if secret is not None:
    secret.delete()
    print("Replacing existing SENSOR_LOCATION_BERLIN_JSON")

secrets.create_secret("SENSOR_LOCATION_BERLIN_JSON", str_dict)

Replacing existing SENSOR_LOCATION_BERLIN_JSON
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('SENSOR_LOCATION_BERLIN_JSON', 'PRIVATE')

### <span style="color:#ff5f27;"> Create the Feature Groups and insert the DataFrames in them </span>

### Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [16]:
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality_berlin',
    description='Air Quality characteristics of each day for Berlin',
    version=1,
    primary_key=['country','city', 'street'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [17]:
air_quality_fg.insert(df_aq)

2025-11-18 11:02:39,001 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279175/fs/1265791/fg/1718740


Uploading Dataframe: 100.00% |██████████| Rows 12914/12914 | Elapsed Time: 00:02 | Remaining Time: 00:00


Launching job: air_quality_berlin_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279175/jobs/named/air_quality_berlin_1_offline_fg_materialization/executions


(Job('air_quality_berlin_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 758884
         }
       },
       "result": {
         "observed_value": 9.0,
         "element_count": 12914,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T10:02:39.000000Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successfu

#### Enter a description for each feature in the Feature Group

In [18]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

air_quality_fg.update_feature_description("url", "url to access the real time sensor data on acqcn.org")
air_quality_fg.update_feature_description("pm25_lagged_1", "pm25 value measured 1 day before the given date")
air_quality_fg.update_feature_description("pm25_lagged_2", "pm25 value measured 2 days before the given date")
air_quality_fg.update_feature_description("pm25_lagged_3", "pm25 value measured 3 days before the given date")

<hsfs.feature_group.FeatureGroup at 0x74267508b890>

### Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [19]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather_berlin',
    description='Weather characteristics of each day for Berlin',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [20]:
# Insert data
weather_fg.insert(weather_df, wait=True)

2025-11-18 11:03:00,901 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279175/fs/1265791/fg/1724775


Uploading Dataframe: 100.00% |██████████| Rows 1674/1674 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_berlin_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279175/jobs/named/weather_berlin_1_offline_fg_materialization/executions
2025-11-18 11:03:17,786 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-18 11:03:20,990 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-18 11:05:08,571 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-18 11:05:08,820 INFO: Waiting for log aggregation to finish.
2025-11-18 11:05:42,430 INFO: Execution finished successfully.


(Job('weather_berlin_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "precipitation_sum",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 762908
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 1674,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-18T10:03:00.000900Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expe

#### Enter a description for each feature in the Feature Group

In [21]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x74267471ffe0>