## Pipeline for backfill.

In [None]:
import sys
import os
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

project_root = os.path.abspath(os.path.join(os.getcwd(), '../..'))
if project_root not in sys.path:
    sys.path.append(project_root)
# Set the environment variables from the file <root_dir>/.env
from backend.models import config
settings = config.HopsworksSettings(_env_file=f".env")

HopsworksSettings initialized!


In [57]:
import datetime
import requests
import pandas as pd
import hopsworks
from backend import util
import datetime
from pathlib import Path
import json
import re
import great_expectations as ge
import warnings
warnings.filterwarnings("ignore")

### Initialize project.

In [58]:
project = hopsworks.login(engine="python")

2025-11-14 22:05:47,476 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-14 22:05:47,479 INFO: Initializing external client
2025-11-14 22:05:47,479 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-14 22:05:48,725 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279162


Initialize parameters, parameters for papermill otherwise from .env.

In [None]:
today = datetime.date.today()

csv_file=f"{project_root}/backend/data/rahu-air-quality.csv"

util.check_file_path(csv_file)

# taken from ~/.env. You can also replace settings.AQICN_API_KEY with the api key value as a string "...."
if settings.AQICN_API_KEY is None:
    print("You need to set AQICN_API_KEY either in this cell or in ~/.env")
    sys.exit(1)

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value() 
country = locals().get("country", settings.AQICN_COUNTRY)
city = locals().get("city", settings.AQICN_CITY)
street = locals().get("street", settings.AQICN_STREET)
aqicn_url = locals().get("aqicn_url", settings.AQICN_URL)
csv_file = locals().get("csv_file", csv_file)
# latitude, longitude = util.get_city_coordinates("tallinn")
latitude, longitude = 59.33, 18.07
print(latitude, longitude)

# country = "sweden"
# city = "stockholm"
# street = "stockholm-st-eriksgatan-83"
# csv_file = f"{project_root}/backend/data/stockholm-st-eriksgatan-83.csv"

print(f"Found AQICN_API_KEY: {AQICN_API_KEY}")

secrets = hopsworks.get_secrets_api()
# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")
if secret is not None:
    secret.delete()
    print("Replacing existing AQICN_API_KEY")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

File successfully found at the path: /Users/jonaslorenz/Desktop/Code_KTH/ScalableML/AirQualityPrediction/backend/data/rahu-air-quality.csv
59.33 18.07
Found AQICN_API_KEY: 85a05e409148fc36fa8a5aef727bc004a02fa69c
Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('AQICN_API_KEY', 'PRIVATE')

In [60]:
try:
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_df.head()

Unnamed: 0,pm25,country,city,street,date,url
0,4.0,sweden,stockholm,stockholm-st-eriksgatan-83,2025-11-14,https://api.waqi.info/feed/@9461


### Read historical data.

In [61]:
df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True)
df

Unnamed: 0,date,pm25,pm10,no2
0,2025-11-01,8.0,9.0,3.0
1,2025-11-02,26.0,8.0,5.0
2,2025-11-03,17.0,3.0,3.0
3,2025-11-04,10.0,7.0,4.0
4,2025-11-05,18.0,11.0,6.0
...,...,...,...,...
2537,2019-11-23,,22.0,5.0
2538,2018-12-31,,8.0,4.0
2539,2018-04-08,,36.0,18.0
2540,2018-06-04,,28.0,14.0


In [62]:
# cast to float
df_aq = df[['date', 'pm25']]
df_aq['pm25'] = df_aq['pm25'].astype('float32')

# drop NaN
df_aq.dropna(inplace=True)

# add country, city and street
df_aq['country']=country
df_aq['city']=city
df_aq['street']=street
df_aq['url']=aqicn_url

group_cols = ['country', 'city', 'street']

aq_history = (
    df_aq.reset_index()
          .sort_values(group_cols + ['date'])
          .rename(columns={'index': 'orig_index'})
)

for lag in [2, 3]:

    aq_history[f'pm25_rolling_{lag}d'] = (
        aq_history
        .groupby(group_cols)['pm25']
        .transform(lambda s: s.rolling(lag, min_periods=1).mean().shift(1))
        .astype('float32')
    )

# ---- Add lags 1, 2, 3 ----
for lag in [1, 2, 3]:
    aq_history[f'pm25_lag_{lag}'] = (
        aq_history
        .groupby(group_cols)['pm25']
        .shift(lag)
        .astype('float32')
    )

# Move results back to df_aq using the saved original index
cols_to_add = ['pm25_rolling_3d', 'pm25_rolling_2d', 'pm25_lag_1', 'pm25_lag_2', 'pm25_lag_3']
for col in cols_to_add:
    df_aq.loc[aq_history['orig_index'], col] = aq_history[col].values

# Drop rows where early lags are NaN (optional)
df_aq.dropna(inplace=True)

df_aq.sort_values('date').tail()
df_aq.info()
df_aq.head()

<class 'pandas.core.frame.DataFrame'>
Index: 2517 entries, 0 to 2519
Data columns (total 11 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   date             2517 non-null   datetime64[ns]
 1   pm25             2517 non-null   float32       
 2   country          2517 non-null   object        
 3   city             2517 non-null   object        
 4   street           2517 non-null   object        
 5   url              2517 non-null   object        
 6   pm25_rolling_3d  2517 non-null   float32       
 7   pm25_rolling_2d  2517 non-null   float32       
 8   pm25_lag_1       2517 non-null   float32       
 9   pm25_lag_2       2517 non-null   float32       
 10  pm25_lag_3       2517 non-null   float32       
dtypes: datetime64[ns](1), float32(6), object(4)
memory usage: 177.0+ KB


Unnamed: 0,date,pm25,country,city,street,url,pm25_rolling_3d,pm25_rolling_2d,pm25_lag_1,pm25_lag_2,pm25_lag_3
0,2025-11-01,8.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@9461,14.333333,13.0,10.0,16.0,17.0
1,2025-11-02,26.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@9461,11.333333,9.0,8.0,10.0,16.0
2,2025-11-03,17.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@9461,14.666667,17.0,26.0,8.0,10.0
3,2025-11-04,10.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@9461,17.0,21.5,17.0,26.0,8.0
4,2025-11-05,18.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@9461,17.666666,13.5,10.0,17.0,26.0


### Get weather data.

In [63]:
earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 59.29701232910156°N 18.163265228271484°E
Elevation 18.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


### Data validation

In [64]:
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

for lag in [2, 3]:
    aq_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":f"pm25_rolling_{lag}d",
                "min_value":-0.1,
                "max_value":500.0,
                "strict_min":True
            }
        )
    )
for lag in [1, 2, 3]:
    aq_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":f"pm25_lag_{lag}",
                "min_value":-0.1,
                "max_value":500.0,
                "strict_min":True
            }
        )
    )

In [65]:
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

Get the feature store

In [66]:
fs = project.get_feature_store()

Create a secret for the sensor location.

In [67]:
dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

# Replace any existing secret with the new value
secret = secrets.get_secret("SENSOR_LOCATION_JSON")
if secret is not None:
    secret.delete()
    print("Replacing existing SENSOR_LOCATION_JSON")

secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)

Replacing existing SENSOR_LOCATION_JSON
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('SENSOR_LOCATION_JSON', 'PRIVATE')

### Insert historical data into feature groups

In [68]:
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality_with_all_lags',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['country','city', 'street'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

In [69]:
air_quality_fg.insert(df_aq)

2025-11-14 22:05:54,110 INFO: 	6 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279162/fs/1265774/fg/1717611


Uploading Dataframe: 100.00% |██████████| Rows 2517/2517 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_with_all_lags_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/air_quality_with_all_lags_1_offline_fg_materialization/executions


(Job('air_quality_with_all_lags_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25_rolling_2d",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 757786
         }
       },
       "result": {
         "observed_value": 4.5,
         "element_count": 2517,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-14T09:05:54.000109Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type

In [70]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_rolling_3d", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_rolling_2d", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_lag_1", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_lag_2", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_lag_3", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

<hsfs.feature_group.FeatureGroup at 0x139a4a1e0>

In [71]:
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

In [72]:
weather_fg.insert(weather_df, wait=True)

2025-11-14 22:06:14,589 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279162/fs/1265774/fg/1638074


Uploading Dataframe: 100.00% |██████████| Rows 2774/2774 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279162/jobs/named/weather_1_offline_fg_materialization/executions
2025-11-14 22:06:32,244 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-14 22:06:35,489 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-11-14 22:08:35,759 INFO: Waiting for execution to finish. Current state: FINISHED. Final status: SUCCEEDED
2025-11-14 22:08:36,169 INFO: Waiting for log aggregation to finish.
2025-11-14 22:08:36,170 INFO: Execution finished successfully.


(Job('weather_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 735353
         }
       },
       "result": {
         "observed_value": 3.41525936126709,
         "element_count": 2774,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-14T09:06:14.000589Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type"

In [73]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x133248d10>