## Feature Store Basics

This notebook introduces the basics of working with the Hopsworks API and Pandas DataFrames.

First, we will define a Pandas DataFrame with 4 credit card transactions in 3 different cities with the same credit card. The last 2 credit card transactions are labeled as 'fraud', while the first 2 transactions are labeled as 'not fraud'.

In [1]:
import pandas as pd

data = { 
    'credit_card_number': ['1111 2222 3333 4444', '1111 2222 3333 4444','1111 2222 3333 4444',
                           '1111 2222 3333 4444'],
    'trans_datetime': ['2022-01-01 08:44', '2022-01-02 19:44', '2022-01-02 20:44', '2022-01-02 20:55'],
    'amount': [142.34, 12.34, 66.29, 112.33],
    'location': ['Sao Paolo', 'Rio De Janeiro', 'Stockholm', 'Stockholm'],
    'fraud': [False, False, True, True] 
}

df = pd.DataFrame.from_dict(data)
df['trans_datetime']= pd.to_datetime(df['trans_datetime'])
df

Unnamed: 0,credit_card_number,trans_datetime,amount,location,fraud
0,1111 2222 3333 4444,2022-01-01 08:44:00,142.34,Sao Paolo,False
1,1111 2222 3333 4444,2022-01-02 19:44:00,12.34,Rio De Janeiro,False
2,1111 2222 3333 4444,2022-01-02 20:44:00,66.29,Stockholm,True
3,1111 2222 3333 4444,2022-01-02 20:55:00,112.33,Stockholm,True


## Connect to Hopsworks

You need an API key to connect. First, login to Hopsworks, then run this code. It will provide a link to get your API key, that you then need to copy and paste into the text box that appears below this cell.

It is good practice to save this API key somewhere safe so you don't have to create a new one every time you use Hopsworks. If you run this code on your laptop, a copy of the API key will be cached locally in this directory in a file with restricted permissions, so you don't have to always re-enter the API key.

In [2]:
import hopsworks
proj = hopsworks.login()
fs = proj.get_feature_store()

2024-12-03 12:06:15,291 INFO: Initializing external client
2024-12-03 12:06:15,292 INFO: Base URL: https://c.app.hopsworks.ai:443
2024-12-03 12:06:16,203 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1193135


### Create a Feature Group

A feature group is a table of features that are computed together in the same feature pipeline and written as a DataFrame to the Feature Store. You should have a unique idenitfier for each row that may be one or more columns, and you define as the `primary_key`. You may also have a column that represents the timestamp or datetime for when row values were observed. If so, you should specify the `event_time` column when creating the Feature Group.

Hopsworks have comprehensive documentation on Feature Groups. Click on these links to learn more.

* [Feature Group Concept](https://docs.hopsworks.ai/3.0/concepts/fs/feature_group/fg_overview/)
* [Feature Group Creation Guide](https://docs.hopsworks.ai/3.0/user_guides/fs/feature_group/create/)
* [Feature Group API Docs](https://docs.hopsworks.ai/feature-store-api/3.0/generated/api/feature_group_api/)

In [3]:
fg = fs.get_or_create_feature_group(
     name="credit_card_transactions_test",
     version=1,
     description="Credit Card Transaction data",
     primary_key=['credit_card_number'],
     event_time='trans_datetime'
) 

In [7]:
len(df)

4

### Write your DataFrame to the Feature Group
When you write your DataFrame to the feature group, first the DataFrame is copied to Hopsworks. 
Then a backfill ingestion job is run on Hopsworks to insert/append the DataFrame to the Feature Group. 
The job is a Spark job, and the data is stored in a Apache Hudi table in Hopsworks.

It will take about 1 minute for the ingestion job to complete.
If you don't want to wait 1 minute, you make the ingestion job run in the background with:


    fg.insert(df, write_options={"wait_for_job": False})

In [4]:
fg.insert(df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1193135/fs/1182813/fg/1377731


Uploading Dataframe: 100.00% |██████████| Rows 4/4 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: credit_card_transactions_test_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1193135/jobs/named/credit_card_transactions_test_1_offline_fg_materialization/executions


(Job('credit_card_transactions_test_1_offline_fg_materialization', 'SPARK'),
 None)

## Read using Feature Views

When you want to use features to train or serve models, you create Feature that are `labels` View a Feature View by first selecting features from Feature Groups. Here, we only have 1 Feature Group, and we select 3 features from it, returning a `query` object. The `query` object defines the set of features (or schema) for a Feature View. 

You create a Feature View with a `query` object (specifying the features and any extra columns that might be needed for inference (but not training)), providing a name and version, and specifying the columns that are `labels`, that is, the target your machine learning algorithm will try and optimize.

In [5]:
query = fg.select(["amount", "location", "fraud"])

fv = fs.create_feature_view(name="credit_card_transactions",
                            version=1,
                            description="Features from the credit_card_transactions FG",
                            labels=["fraud"],
                            query=query)

RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/398/featurestores/335/featureview). Server response: 
HTTP code: 400, HTTP reason: Bad Request, error code: 270179, error msg: The provided feature view name and version already exists, user msg: Feature view: credit_card_transactions, version: 1

### Splitting into Train/Test sets

With a Feature View, you can read train and test sets directly as Pandas DataFrames - similar to scikit-learn.
Here, 

* `X_train` is the features of our train set, 
* `y_train` is the labels of our train set, 
* `X_test` is the features of our test set, 
* `y_test` is the labels of our test set.

In [None]:
X_train, X_test, y_train, y_test = fv.train_test_split(0.5)
X_train

### Saving training data as files
Sometimes, if you have a large volume of training data, it is better to save training data as files. Then read the files in your training pipeline. You can create training data as CSV files that is randomly split into train/test sets as follows (the `td_version` is the version of the training data for this feature view, and you can track the progress of the job used to create the training data using the `td_job` object).

In [None]:
td_version, td_job = fv.create_train_test_split(
    description = 'Transactions fraud batch training dataset',
    data_format = 'csv',
    test_size = 0.5,
    write_options = {'wait_for_job': True},
    coalesce = True,
)

## Training Data as files
The training data is now stored as a CSV file on Hopsworks under `Project Settings` -> `File Browser` -> <username>_Training_Datasets.
    
You can read the training data as split train/test sets with the following. Note the parameter `td_version` we pass here. A feature view can have many training datasets, so you need to supply the version you want. 

In [None]:
X_train, y_train, X_test, y_test = fv.get_train_test_split(td_version)
X_train

### Aggregations

Compute the total amount spent on the credit card by first grouping all the rows together with the same `credit_card_number` and then summing up their amounts. 

The code first creates a new DataFrame with only the `credit_card_number` and `amount` columns, then the logic of a group-by could be described as 

    for-each (`credit_card_number`) do \sigma amount

In [None]:
df2 = df[["credit_card_number", "amount"]].groupby("credit_card_number").sum()
df2.rename(columns={"amount": "total_spent"}, inplace=True)
df2.info()

In [None]:
df2

 We might also want to know at what point-in-time was that total and add a column with the datetime of the last (most recent) credit card transaction.

In [None]:
df2["as_of_datetime"] = df[["credit_card_number", "trans_datetime"]].groupby("credit_card_number").max()
df2

The `groupby` operation sets `credit_card_number` as the index of our DataFrame.
We want `credit_card_number` as a column, as Pandas indexes are not written to the Feature Group.
We can move the index to a column using `reset_index`.

In [None]:
df2.reset_index(inplace=True)
df2

We create a feature group to store the contents of `df2` with our aggregated credit card spending information.

In [None]:
fg2 = fs.get_or_create_feature_group(
     name="credit_card_spending",
     version=1,
     description="Credit Card Spending",
     primary_key=['credit_card_number'],
     event_time='as_of_datetime'
) 

In [None]:
fg2.insert(df2, write_options={"wait_for_job": False})

Let's add some more data to our original feature group

In [None]:
more_data = { 
    'credit_card_number': ['9999 8888 7777 6666', '9999 8888 7777 6666','9999 8888 7777 6666',
                           '9999 8888 7777 6666'],
    'trans_datetime': ['2022-01-02 04:11', '2022-01-03 07:24', '2022-01-05 10:33', '2022-01-05 11:50'],
    'amount': [55.67, 84, 77.95, 183],
    'location': ['San Francisco', 'San Francisco', 'Dublin', 'Dublin'],
    'fraud': [False, False, False, False] 
}

df3 = pd.DataFrame.from_dict(more_data)
df3['trans_datetime']= pd.to_datetime(df3['trans_datetime'])

fg = fs.get_feature_group(name="credit_card_transactions", version=1)

fg.insert(df3, write_options={"wait_for_job": False})

Now let's compute how much money was spent on the card since the last time we computed amount spent

## Time Series: Window Aggregations

Count the amount of money spent per day (make the length of the window '1d').
We will need to set the `event_time` column as the index in order to use Pandas built-in window aggregations.

In [None]:
df5 = fg.read()
df5

In [None]:
df5 = df5.set_index('trans_datetime')

In [None]:
df5 = df5.sort_index()

In [None]:
    df5['rolling_max_1d'] = df5.rolling('1D').amount.max()
    df5

In [None]:
df5['rolling_mean_1d'] = df5.rolling('1D').amount.mean()
df5

In [None]:
df5.reset_index(inplace=True)

In [None]:
fg_agg = fs.get_or_create_feature_group(
     name="credit_card_rolling_windows",
     version=1,
     description="Daily Credit Card Spending",
     primary_key=['credit_card_number'],
     event_time='trans_datetime'
) 

In [None]:
fg_agg.insert(df5)

### Create a Feature View using features from multiple Feature Groups

We want to create a model that uses features from multiple feature groups. 
We will select features from the different feature groups and join them together to create a query object. 
We can read the data in the query object as a DataFrame to inspect it before we create the feature view. 
We will use the feature view to read the training data for the model.

In [None]:
query = fg.select_all().join(fg_agg.select(['rolling_max_1d', 'rolling_mean_1d']))

training_data = query.read()
training_data.head()

In [None]:
fv = fs.create_feature_view(name="credit_card_fraud_rolling",
                            description="Features for a model to predict credit card fraud, including rolling windows",
                            version=1,
                            query=query)

In [None]:
X_train, y_train, X_test, y_test = fv.train_test_split(0.5)
X_train

### Read from Feature Groups

You are also able to read data from Feature Groups as DataFrames.

In [None]:
fg = fs.get_feature_group(name="credit_card_transactions", version=1)
read_df = fg.read()

In [None]:
read_df

### Filters
You can use filters on the `query` object or on the Feature Groups, when reading from them. Here, we read all rows where the transaction amount is greater than 100.

In [None]:
from hsfs.feature import Feature

big_amounts_df = fg.filter(Feature("amount") > 100).read()
big_amounts_df