<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill for Air Quality Data</span>


## 🗒️ You have the following tasks
1. Choose an Air Quality Sensor
2. Update the country, city, and street information to point to YOUR chosen Air Quality Sensor
3. Download historical measures for your Air Quality Sensor as a CSV file
4. Update the path of the CSV file in this notebook to point to the one that you downloaded
5. Create an account on www.hopsworks.ai and get your HOPSWORKS_API_KEY
6. Run this notebook



### <span style='color:#ff5f27'> 📝 Imports

In [1]:
import datetime
import requests
import pandas as pd
import hopsworks
import datetime
from pathlib import Path
from functions import util
import json
import re
import os
import warnings
import holidays
warnings.filterwarnings("ignore")

  from .autonotebook import tqdm as notebook_tqdm


### IF YOU WANT TO WIPE OUT ALL OF YOUR FEATURES AND MODELS, run the cell below

In [2]:
# If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
# os.environ["HOPSWORKS_API_KEY"] = ""
# proj = hopsworks.login()
# util.purge_project(proj)

---

In [2]:
csv_file="../../data/bikes_oct.csv"
util.check_file_path(csv_file)

File successfully found at the path: ../../data/bikes_oct.csv


In [3]:
station_id =42

bikes_url = 'https://opendata-ajuntament.barcelona.cat/data/dataset/estat-estacions-bicing/resource/1b215493-9e63-4a12-8980-2d7e0fa19f85/download/recurs.json'

# Station 42 latitude and longitude
latitude = "41.404511"
longitude = "2.189881"
city = "Barcelona"

today = datetime.date.today()

In [4]:
bicing_api_key_file = '../../data/bicing-api-key.txt'
util.check_file_path(bicing_api_key_file)

with open(bicing_api_key_file, 'r') as file:
    BICING_API_KEY = file.read().rstrip()

File successfully found at the path: ../../data/bicing-api-key.txt


## Hopsworks API Key
You need to have registered an account on app.hopsworks.ai.
You will be prompted to enter your API key here, unless you set it as the environment variable HOPSWORKS_API_KEY (my preffered approach).

In [5]:
with open('../../data/hopsworks-api-key.txt', 'r') as file:
    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()
    
project = hopsworks.login(project="juls_first_project")

2025-01-03 11:56:11,446 INFO: Initializing external client
2025-01-03 11:56:11,446 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-03 11:56:12,873 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1164440


In [6]:
secrets = hopsworks.get_secrets_api()
try:
    secrets.create_secret("BICING_API_KEY", BICING_API_KEY)
except hopsworks.RestAPIError:
    BICING_API_KEY = secrets.get_secret("BICING_API_KEY").value

In [7]:
try:
    bikes_now_df = util.fetch_station_data(bikes_url, BICING_API_KEY, station_id)
except hopsworks.RestAPIError:
    print("It looks like the BICING_API_KEY doesn't work. Is the API key correct? Is the URL correct?")
bikes_now_df.head()

Unnamed: 0,station_id,num_bikes_available,last_reported
0,42,19,2025-01-03 10:55:37 UTC


## <span style='color:#ff5f27'> 🌍 STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame

In [8]:
#get only the station_id=42 from the csv file
df = pd.read_csv(csv_file,  parse_dates=['last_updated'] , skipinitialspace=True)
df = df[df['station_id'] == station_id]
df.head()

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,last_reported,is_charging_station,status,is_installed,is_renting,is_returning,traffic,last_updated,ttl,V1
39,42.0,1.0,0.0,1.0,21.0,1727733000.0,True,IN_SERVICE,1.0,1.0,1.0,,1727733600,0.0,
552,42.0,0.0,0.0,0.0,22.0,1727734000.0,True,IN_SERVICE,1.0,1.0,1.0,,1727733901,0.0,
1064,42.0,0.0,0.0,0.0,22.0,1727734000.0,True,IN_SERVICE,1.0,1.0,1.0,,1727734198,0.0,
1576,42.0,0.0,0.0,0.0,22.0,1727734000.0,True,IN_SERVICE,1.0,1.0,1.0,,1727734501,0.0,
2088,42.0,0.0,0.0,0.0,22.0,1727735000.0,True,IN_SERVICE,1.0,1.0,1.0,,1727734804,0.0,


## <span style='color:#ff5f27'> 🌍 STEP 6: Data cleaning</span>


### Rename columns if needed and drop unneccessary columns

We want to have a DataFrame with 2 columns - `date` and `pm25` after this cell below:

## Check the data types for the columns in your DataFrame

 * `date` should be of type   datetime64[ns] 
 * `pm25` should be of type float64

In [9]:
# These commands will succeed if your CSV file didn't have a `median` or `timestamp` column
#df = df.rename(columns={"median": "pm25"})
#df = df.rename(columns={"timestamp": "date"})

station_df = df[['last_updated', 'num_bikes_available']]
station_df['last_updated'] = pd.to_datetime(
    station_df['last_updated'], unit='s', utc=True, errors='coerce'
)

# Crear columna 'date' con la fecha
station_df['day'] = station_df['last_updated'].dt.strftime('%Y-%m-%d')
# Crear columna 'time' con la hora
station_df['time'] = station_df['last_updated'].dt.strftime('%H')
station_df = station_df.rename(columns={"last_updated": "date"})
hourly_avg_df = station_df.groupby(['day', 'time']).mean(["num_bikes_available"]).reset_index()

## Add a column date with the date based on the columns day and time, type datetime
hourly_avg_df['date'] = hourly_avg_df['day'] + ' ' + hourly_avg_df['time'] + ':00:00'
hourly_avg_df['date'] = pd.to_datetime(hourly_avg_df['date'], format='%Y-%m-%d %H:%M:%S')

## Adding a new boolean column if the date is weekend or not
hourly_avg_df['is_weekend'] = hourly_avg_df['date'].dt.dayofweek > 4
## Adding a new boolean column if the date is holiday or not
holidays_es = holidays.Spain()
hourly_avg_df['is_holiday'] = hourly_avg_df['date'].dt.date.astype(str).map(lambda x: x in holidays_es)

hourly_avg_df

# Get the column with date 2024



Unnamed: 0,day,time,num_bikes_available,date,is_weekend,is_holiday
0,2024-09-30,22,0.250000,2024-09-30 22:00:00,False,False
1,2024-09-30,23,0.333333,2024-09-30 23:00:00,False,False
2,2024-10-01,00,1.230769,2024-10-01 00:00:00,False,False
3,2024-10-01,01,1.333333,2024-10-01 01:00:00,False,False
4,2024-10-01,02,1.166667,2024-10-01 02:00:00,False,False
...,...,...,...,...,...,...
718,2024-10-31,19,19.818182,2024-10-31 19:00:00,False,False
719,2024-10-31,20,18.583333,2024-10-31 20:00:00,False,False
720,2024-10-31,21,15.833333,2024-10-31 21:00:00,False,False
721,2024-10-31,22,18.166667,2024-10-31 22:00:00,False,False


In [10]:
# Cast the pm25 column to be a float32 data type
hourly_avg_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 723 entries, 0 to 722
Data columns (total 6 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   day                  723 non-null    object        
 1   time                 723 non-null    object        
 2   num_bikes_available  723 non-null    float64       
 3   date                 723 non-null    datetime64[ns]
 4   is_weekend           723 non-null    bool          
 5   is_holiday           723 non-null    bool          
dtypes: bool(2), datetime64[ns](1), float64(1), object(2)
memory usage: 24.1+ KB


## <span style='color:#ff5f27'> 🌍 STEP 7: Drop any rows with missing data </span>
It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.

In [11]:
hourly_avg_df.dropna(inplace=True)
hourly_avg_df

Unnamed: 0,day,time,num_bikes_available,date,is_weekend,is_holiday
0,2024-09-30,22,0.250000,2024-09-30 22:00:00,False,False
1,2024-09-30,23,0.333333,2024-09-30 23:00:00,False,False
2,2024-10-01,00,1.230769,2024-10-01 00:00:00,False,False
3,2024-10-01,01,1.333333,2024-10-01 01:00:00,False,False
4,2024-10-01,02,1.166667,2024-10-01 02:00:00,False,False
...,...,...,...,...,...,...
718,2024-10-31,19,19.818182,2024-10-31 19:00:00,False,False
719,2024-10-31,20,18.583333,2024-10-31 20:00:00,False,False
720,2024-10-31,21,15.833333,2024-10-31 21:00:00,False,False
721,2024-10-31,22,18.166667,2024-10-31 22:00:00,False,False


---

## <span style='color:#ff5f27'> 🌦 Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## <span style='color:#ff5f27'> 🌍 STEP 9: Download the Historical Weather Data </span>

https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

We will download the historical weather data for your `city` by first extracting the earliest date from your DataFrame containing the historical air quality measurements.

We will download all daily historical weather data measurements for your `city` from the earliest date in your air quality measurement DataFrame. It doesn't matter if there are missing days of air quality measurements. We can store all of the daily weather measurements, and when we build our training dataset, we will join up the air quality measurements for a given day to its weather features for that day. 

The weather features we will download are:

 * `temperature (average over the day)`
 * `precipitation (the total over the day)`
 * `wind speed (average over the day)`
 * `wind direction (the most dominant direction over the day)`


In [13]:
earliest_bikes_date = pd.Series.min(hourly_avg_df['day'])

weather_df = util.get_historical_weather(city, earliest_bikes_date, str(today), latitude, longitude)

weather_df['date'] = pd.to_datetime(
    weather_df['date'], unit='s', utc=True, errors='coerce'
)

weather_df['time'] = weather_df['date'].dt.strftime('%H')
weather_df['day'] = weather_df['date'].dt.strftime('%Y-%m-%d')
weather_df

Coordinates 41.37082290649414°N 2.068965435028076°E
Elevation 13.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


Unnamed: 0,date,precipitation,temperature,city,time,day
0,2024-09-30 00:00:00+00:00,0.0,15.441501,Barcelona,00,2024-09-30
1,2024-09-30 01:00:00+00:00,0.0,15.791500,Barcelona,01,2024-09-30
2,2024-09-30 02:00:00+00:00,0.0,15.791500,Barcelona,02,2024-09-30
3,2024-09-30 03:00:00+00:00,0.0,15.641500,Barcelona,03,2024-09-30
4,2024-09-30 04:00:00+00:00,0.0,15.291500,Barcelona,04,2024-09-30
...,...,...,...,...,...,...
2252,2025-01-01 20:00:00+00:00,0.0,6.691500,Barcelona,20,2025-01-01
2253,2025-01-01 21:00:00+00:00,0.0,4.941500,Barcelona,21,2025-01-01
2254,2025-01-01 22:00:00+00:00,0.0,3.391500,Barcelona,22,2025-01-01
2255,2025-01-01 23:00:00+00:00,0.0,2.941500,Barcelona,23,2025-01-01


In [14]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2257 entries, 0 to 2256
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype              
---  ------         --------------  -----              
 0   date           2257 non-null   datetime64[ns, UTC]
 1   precipitation  2257 non-null   float32            
 2   temperature    2257 non-null   float32            
 3   city           2257 non-null   object             
 4   time           2257 non-null   object             
 5   day            2257 non-null   object             
dtypes: datetime64[ns, UTC](1), float32(2), object(3)
memory usage: 105.8+ KB


## <span style='color:#ff5f27'> 🌍 STEP 10: Define Data Validation Rules </span>

We will validate the air quality measurements (`pm25` values) before we write them to Hopsworks.

We define a data validation rule (an expectation in Great Expectations) that ensures that `pm25` values are not negative or above the max value available by the sensor.

We will attach this expectation to the air quality feature group, so that we validate the `pm25` data every time we write a DataFrame to the feature group. We want to prevent garbage-in, garbage-out.

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

---

### <span style="color:#ff5f27;"> 🔮 STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [15]:
fs = project.get_feature_store() 

#### Save country, city, street names as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline

In [16]:
dict_obj = {
    "city": city,
    "station_id": station_id,
    "bikes_url": bikes_url,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

try:
    secrets.create_secret("STATION_PARAMS_JSON", str_dict)
except hopsworks.RestAPIError:
    print("STATION_PARAMS_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.")
    existing_key = secrets.get_secret("STATION_PARAMS_JSON").value
    print(f"{existing_key}")

Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


### <span style="color:#ff5f27;"> 🔮 STEP 12: Create the Feature Groups and insert the DataFrames in them </span>

### <span style='color:#ff5f27'> 🌫 Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [42]:
bikes_fg = fs.get_or_create_feature_group(
    name='bikes',
    description='Bikes available at a station every hour',
    version=1,
    primary_key=['time', 'date', 'day'],
    event_time="last_updated",
)

#### Insert the DataFrame into the Feature Group

In [43]:
bikes_fg.insert(hourly_avg_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1164440/fs/1155143/fg/1394494


Uploading Dataframe: 100.00% |██████████| Rows 723/723 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: bikes_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1164440/jobs/named/bikes_2_offline_fg_materialization/executions


(Job('bikes_2_offline_fg_materialization', 'SPARK'), None)

#### Enter a description for each feature in the Feature Group

In [None]:
bikes_fg.update_feature_description("date", "Date of measurement of bikes availability")
bikes_fg.update_feature_description("time", "Hour of measurement of bikes availability")
bikes_fg.update_feature_description("num_bikes_available", "Available bikes at the station")
bikes_fg.update_feature_description("last_updated", "Last time the data was updated")

<hsfs.feature_group.FeatureGroup at 0x11042a690>

### <span style='color:#ff5f27'> 🌦 Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [48]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each hour',
    version=1,
    primary_key=['date', 'time', 'day'],
    event_time="date",
) 

#### Insert the DataFrame into the Feature Group

In [49]:
# Insert data
weather_fg.insert(weather_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1164440/fs/1155143/fg/1393484


Uploading Dataframe: 100.00% |██████████| Rows 2257/2257 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1164440/jobs/named/weather_1_offline_fg_materialization/executions


(Job('weather_1_offline_fg_materialization', 'SPARK'), None)

#### Enter a description for each feature in the Feature Group

In [None]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("day", "Day of measurement of weather")
weather_fg.update_feature_description("time", "Time of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("precipitation", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_100m", "Wind speed at 100m abouve ground")

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Daily Feature Pipeline 
 </span> 


## <span style="color:#ff5f27;">⏭️ **Exercises:** 
 </span> 

Extra Homework:

  * Try adding a new feature based on a rolling window of 3 days for 'pm25'
      * This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
      * df.set_index("date").rolling(3).mean() is only the start....
  * Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters. 
      * Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
      * After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System
  * As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---