In [1]:
import datetime
import pandas as pd
import hopsworks
import datetime
import json
import warnings
import sys
import time
from helpers import util
from helpers import config
warnings.filterwarnings("ignore")

In [2]:
# Setup
settings = config.HopsworksSettings(_env_file="./.env")
project = hopsworks.login(engine="python")
AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value() 

# Replace any existing secret with the new value
secrets = hopsworks.get_secrets_api()
try:
    secret = secrets.get_secret("AQICN_API_KEY")
    secret.delete()
    print("Replacing existing AQICN_API_KEY")
except hopsworks.RestAPIError:
    print("Creating new AQICN_API_KEY")
secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

today = datetime.date.today()
sensor_params = []
for url, country, city, street in zip(settings.AQICN_URLS, settings.AQICN_COUNTRIES, settings.AQICN_CITIES, settings.AQICN_STREETS):
    # Test if API works for all sensors
    try:
        aq_today_df = util.get_pm25(url, country, city, street, today, AQICN_API_KEY)
        print(aq_today_df.head())
    except hopsworks.RestAPIError:
        print(f"It looks like the AQICN_API_KEY doesn't work for sensor {country} {city} {street}")
    latitude, longitude = util.get_city_coordinates(city)
    time.sleep(5) # Avoid access blocking by nominatim geocoding which is needed for coordinates retrieval
    # Create param groups for all sensors
    sensor_params.append({
        "aqicn_url": url,
        "country": country,
        "city": city,
        "street": street,
        "latitude": latitude,
        "longitude": longitude,
    })

HopsworksSettings initialized!
2025-11-10 14:59:23,972 INFO: Initializing external client
2025-11-10 14:59:23,974 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-10 14:59:25,457 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1272014
Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
   pm25 country      city         street       date  \
0  20.0  sweden  skoghall  folkungavägen 2025-11-10   

                                  url  
0  https://api.waqi.info/feed/A401314  
   pm25 country     city   street       date  \
0   1.0  sweden  jonsbyn  acksjön 2025-11-10   

                                  url  
0  https://api.waqi.info/feed/A121810  
   pm25 country    city  street       date                                url
0   6.0  sweden  årjäng  strand 2025-11-10  https://api.waqi.info/feed/A87319
   pm25 country      city        street       date  \
0   3.0  sweden  nykroppa  timmersvägen 2025-11-10   

                                  url  
0  https://api.waqi.info/feed/A208483  


In [3]:
def prepare_historical_data(aqicn_url, country, city, street, **kwargs):
    # Read historical air quality data from csv
    csv_file = f"./data/{street}_{city}_{country}.csv"
    aq_df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True)
    aq_df = aq_df[['date', 'pm25']]
    aq_df['pm25'] = aq_df['pm25'].astype('float32')

    # Make sure date is the index, sorted ascending
    aq_df.set_index('date', inplace=True)
    aq_df = aq_df.sort_index()

    # Create continuous daily date range from min to max date
    full_date_range = pd.date_range(start=aq_df.index.min(), end=aq_df.index.max(), freq='D')

    # Reindex to add missing dates as NaN rows
    aq_df = aq_df.reindex(full_date_range)

    # Compute lagged columns (pm25 might have NaNs for missing days)
    aq_df['lagged_1'] = aq_df['pm25'].shift(1).astype('float32')
    aq_df['lagged_2'] = aq_df['pm25'].shift(2).astype('float32')
    aq_df['lagged_3'] = aq_df['pm25'].shift(3).astype('float32')
    aq_df.dropna(inplace=True)

    # Add general info
    aq_df['country']=country
    aq_df['city']=city
    aq_df['street']=street
    aq_df['url']=aqicn_url

    # Get date feature back
    aq_df.reset_index(inplace=True)
    aq_df.rename(columns={'index': 'date'}, inplace=True)
    aq_df.info()
    
    # Retrieve weather data
    earliest_aq_date = pd.Series.min(aq_df['date'])
    earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
    weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)
    weather_df.info()
    return aq_df, weather_df

In [4]:
import great_expectations as ge

# Data validation rules for air quality data
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

# Data validation rules for weather data
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

In [5]:
fs = project.get_feature_store() 

# Get or create air quality feature group in hopsworks
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['country','city','street'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

# Get or create weather feature group in hopsworks
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 


In [6]:
# Create hopsworks secret for sensor metadata
try:
    secret = secrets.get_secret("SENSORS_JSON")
    secret.delete()
    print("Replacing existing SENSORS_JSON")
except hopsworks.RestAPIError:
    print("Creating new SENSORS_JSON")
secrets.create_secret("SENSORS_JSON", json.dumps(sensor_params))

# Run pipline for all sensors
aq_df_all = []
weather_df_all = []
for param_dict in sensor_params:
    aq_df, weather_df = prepare_historical_data(**param_dict)
    aq_df_all.append(aq_df)
    weather_df_all.append(weather_df)
aq_df_all = pd.concat(aq_df_all, ignore_index=True)
weather_df_all = pd.concat(weather_df_all, ignore_index=True)
print(aq_df_all.tail(20))
print(weather_df_all.tail(20))

Replacing existing SENSORS_JSON
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 792 entries, 0 to 791
Data columns (total 9 columns):
 #   Column    Non-Null Count  Dtype              
---  ------    --------------  -----              
 0   date      792 non-null    datetime64[ns, UTC]
 1   pm25      792 non-null    float32            
 2   lagged_1  792 non-null    float32            
 3   lagged_2  792 non-null    float32            
 4   lagged_3  792 non-null    float32            
 5   country   792 non-null    object             
 6   city      792 non-null    object             
 7   street    792 non-null    object             
 8   url       792 non-null    object             
dtypes: datetime64[ns, UTC](1), float32(4), object(4)
memory usage: 43.4+ KB
Coordinates 59.64850616455078°N 14.243119239807129°E
Elevation 146.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
<class 'p

In [8]:
air_quality_fg.insert(aq_df_all)
weather_fg.insert(weather_df_all, wait=True)

2025-11-10 15:01:57,078 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1272014/fs/1258613/fg/1668629


Uploading Dataframe: 100.00% |██████████| Rows 6099/6099 | Elapsed Time: 00:01 | Remaining Time: 00:00


2025-11-10 15:02:06,188 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1272014/fs/1258613/fg/1668630


Uploading Dataframe: 100.00% |██████████| Rows 6564/6564 | Elapsed Time: 00:02 | Remaining Time: 00:00


(Job('weather_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "precipitation_sum",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 739377
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 6564,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-10T02:02:06.000187Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_colu

In [9]:
# Add feature descriptions
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("url", "Url to sensor live data")
air_quality_fg.update_feature_description("lagged_1", "Pm25 from the previous day")
air_quality_fg.update_feature_description("lagged_2", "Pm25 from the second to last day")
air_quality_fg.update_feature_description("lagged_3", "Pm25 from the third to last day")

weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x7f15ab6c6110>