# Feature Engineering Pipeline with Feature Store

## Overview
In this project i made use of Hopsworks feature store. Hopsworks and its Feature Store are an open source data-intensive AI platform used for the development and operation of machine learning models at scale. The Hopsworks Feature Store provides the HSFS API 

- to enable clients to write features to feature groups in the feature store, 
- and to read features from feature views 

![The Hopsworks Architecture!](../images/fs_architecture.jpg "The Hopsworks Architecture")

## Feature Pipelines
The Feature Pipeline is the foundation of the FTI architecture. It is responsible for transforming raw data into engineered features that are ready for both training and inference. This involves:

- **Data Extraction**: Retrieving raw data from various sources, such as relational databases, APIs, or data lakes. This part can be separated from the Feature Pipeline.
- **Feature Engineering**: Performing transformations like aggregations, scaling, encoding, and computing derived metrics.
- **Feature Storage**: Saving the processed features in a feature store (e.g., Feast, Hopsworks) for reuse during training and inference.


![The ETL Architecture!](../images/ETL_architecture.png "The ETL Architecture")

**Steps:**

1. Run in either "Backfill" or "Normal" operation.

2. IF BACKFILL==True, we will load our DataFrame with data from the iris.csv file

3. ELSE BACKFILL==False, we will load a DataFrame with new air data from API

4. Write our DataFrame to a Feature Grou

In [18]:

import os
import sys
from pathlib import Path
import time
from dotenv import load_dotenv
import hopsworks
from confluent_kafka import Producer
import pandas as pd

sys.path.insert(0, str(Path().resolve().parent / "src"))

from paths import  TRANSFORMED_DATA_DIR
from components.extract_air_data_api import *


# load environment
load_dotenv()


HOPSWORK_LOGIN_API_KEY = os.getenv("HOPSWORK_LOGIN_API_KEY")
BACKFILL= False

In [8]:
# hopsworks version
hopsworks.__version__

'4.1.8'

### Backfill or create new input data
you can run this pipeline in either backfill or new-data mode

In [19]:
# dataframe
if BACKFILL == True:
    df = pd.read_csv(f"{TRANSFORMED_DATA_DIR}/weather_20200101_to_20250201.csv")
    # Convert timestamp column to datetime
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    # Convert 'date' column to datetime format
    df['date'] = df["timestamp"].dt.date
    
    # Convert 'time' column to time format
    df['time'] = df["timestamp"].dt.time
    df['time'] = df['time'].astype(str)
else:
    df = extract_and_load_current_air_pollution_data()
    
    
df.head()

Unnamed: 0,aqi,co,no,no2,o3,so2,pm2_5,pm10,nh3,timestamp,date,time,aqi_bucket
0,1,460.63,0.36,1.56,33.62,1.45,5.6,15.63,4.18,2025-03-10 12:57:02,2025-03-10,12:57:02,Good


In [20]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 13 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   aqi         1 non-null      int64         
 1   co          1 non-null      float64       
 2   no          1 non-null      float64       
 3   no2         1 non-null      float64       
 4   o3          1 non-null      float64       
 5   so2         1 non-null      float64       
 6   pm2_5       1 non-null      float64       
 7   pm10        1 non-null      float64       
 8   nh3         1 non-null      float64       
 9   timestamp   1 non-null      datetime64[ns]
 10  date        1 non-null      object        
 11  time        1 non-null      object        
 12  aqi_bucket  1 non-null      object        
dtypes: datetime64[ns](1), float64(8), int64(1), object(3)
memory usage: 236.0+ bytes


### Authenticate with Hopsworks using your API Key


In [21]:
# Login to the Hopsworks feature store
connection = hopsworks.login(
    host='c.app.hopsworks.ai',                 # DNS of your Feature Store instance
    port=443,  
    project='air_quality_project', 
    engine="python",
    api_key_value=HOPSWORK_LOGIN_API_KEY
)


2025-03-10 13:57:07,729 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-10 13:57:07,817 INFO: Initializing external client
2025-03-10 13:57:07,820 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-10 13:57:10,687 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1214615


In [22]:
# connect to feature store
project_name = "air_quality_project"

try:
    feature_store = connection.get_feature_store(name=project_name)
    print(f"✅ Successfully Connected to {feature_store.project_name}")
except Exception as e:
    print(f"❌ Feature store not available!")

✅ Successfully Connected to air_quality_project


### Create and write to a feature group - primary keys

To prevent duplicate entries, Hopsworks requires that each DataFame has a primary_key. 
A primary_key is one or more columns that uniquely identify the row. Here, we assume that each Iris flower has a unique combination of ("date", "time") feature values.

The feature group will create its online schema using the schema of the Pandas DataFame.

In [23]:
# Get or Create a feature group

fg = feature_store.get_or_create_feature_group(
    name="air_quality_historical_data_2020_to_2025",
    version=1,
    description="Historical Data of Air Quality in Lagos",
    primary_key=['date', 'time'],
    event_time='timestamp',
    online_enabled=True,
)

In [25]:
# save dataframe into feature group
start_time = time.time()

try:   
    fg.insert(df, write_options={"wait_for_job": False})
except Exception as err:
    print(f"Feature group {fg.name} already exists! or Error encountered")
    raise err

print("Upload time %s seconds ---" % (time.time() - start_time))
print('✅ Done!')

Uploading Dataframe: 100.00% |██████████| Rows 1/1 | Elapsed Time: 00:02 | Remaining Time: 00:00


Upload time 15.722429037094116 seconds ---
✅ Done!


Use fg.materialization_job.run(args=-op offline_fg_materialization -path hdfs:///Projects/air_quality_project/Resources/jobs/air_quality_historical_data_2020_to_2025_1_offline_fg_materialization/config_1741611083080) to trigger the materialization job again.


In [26]:
df.columns

Index(['aqi', 'co', 'no', 'no2', 'o3', 'so2', 'pm2_5', 'pm10', 'nh3',
       'timestamp', 'date', 'time', 'aqi_bucket'],
      dtype='object')

In [17]:
# updates the feature description
feature_descriptions = [
    {"name": "aqi", "description": "Air Quality Index (AQI) value indicating the pollution level."},
    {"name": "co", "description": "Carbon Monoxide (CO) concentration in µg/m³."},
    {"name": "no", "description": "Nitric Oxide (NO) concentration in µg/m³."},
    {"name": "no2", "description": "Nitrogen Dioxide (NO₂) concentration in µg/m³."},
    {"name": "o3", "description": "Ozone (O₃) concentration in µg/m³."},
    {"name": "so2", "description": "Sulfur Dioxide (SO₂) concentration in µg/m³."},
    {"name": "pm2_5", "description": "Fine Particulate Matter (PM2.5) concentration in µg/m³."},
    {"name": "pm10", "description": "Coarse Particulate Matter (PM10) concentration in µg/m³."},
    {"name": "nh3", "description": "Ammonia (NH₃) concentration in µg/m³."},
    {"name": "timestamp", "description": "timestamp"},
    {"name": "date", "description": "The date of the recorded measurement (YYYY-MM-DD)."},
    {"name": "time", "description": "The time of the recorded measurement (HH:MM:SS)."},
    {"aqi_bucket": "timestamp", "description": "Categorical label describing the AQI level."}
]

for desc in feature_descriptions: 
    fg.update_feature_description(desc["name"], desc["description"])

RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1214615/featurestores/1202247/featuregroups/1403822). Server response: 
HTTP code: 500, HTTP reason: Internal Server Error, body: b'{"errorCode":120003,"usrMsg":"Transaction marked for rollback.","errorMsg":"The last transaction did not complete as expected"}', error code: 120003, error msg: The last transaction did not complete as expected, user msg: Transaction marked for rollback.