This is the first part of the quick start series of tutorials about Hopsworks Feature Store. As part of this first module, you will work with data related to credit card transactions. 
The objective of this tutorial is to demonstrate how to work with the **Hopworks Feature Store**  for online data with a goal of training and deploying a model that can predict fraudulent transactions.

## <span style='color:#ff5f27'> 📝 Imports

In [6]:
import hopsworks

import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

from mlopstemplate.features import transactions, profile
from mlopstemplate.synthetic_data import synthetic_data
from mlopstemplate.synthetic_data.data_sources import get_datasets

First of all you will load the data and do some feature engineering on it.

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

The data you will use comes from 2 different CSV files:

- `transactions.csv`: events containing information about when a credit card was used, such as a timestamp, location, and the amount spent. A boolean fraud_label variable (True/False) tells us whether a transaction was fraudulent or not.
- `profiles.csv`: credit card user information such as birthdate and city of residence.

In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **These files have a common credit card number column cc_num, which you will use later to join features together from the different datasets.**

Now, you can go ahead and load the data.

In [2]:
# get data from the source
trans_df, labels_df, profiles_df = get_datasets()

---

## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>

Fraudulent transactions can differ from regular ones in many different ways. Typical red flags would for instance be a large transaction volume/frequency in the span of a few hours. It could also be the case that elderly people in particular are targeted by fraudsters. To facilitate model learning you will create additional features based on these patterns. In particular, you will create two types of features:
1. **Features that aggregate data from different data sources**. This could for instance be the age of a customer at the time of a transaction, which combines the `birthdate` feature from `profiles.csv` with the `datetime` feature from `transactions.csv`.
2. **Features that aggregate data from multiple time steps**. An example of this could be the transaction frequency of a credit card in the span of a few hours, which is computed using a window function.

Let's start with the first category.

Now you are ready to start by computing the distance between consecutive transactions, lets call it `loc_delta`.
Here you will use the [Haversine distance](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.haversine_distances.html?highlight=haversine#sklearn.metrics.pairwise.haversine_distances) to quantify the distance between two longitude and latitude coordinates.

In [3]:
# compute profile features
# select final features
profiles_df = profile.select_features(profiles_df)

In [4]:
# Compute transaction features

# compute previous location of the transaction
trans_df = transactions.loc_delta_t_minus_1(trans_df)

# Computes time difference between current and previous transaction
trans_df = transactions.time_delta_t_minus_1(trans_df)

# Compute year and month string from datetime column.
trans_df["month"] = transactions.get_year_month(trans_df.datetime)

# compute on demand features
# customer's age at transaction
trans_df = transactions.card_owner_age(trans_df, profiles_df)

# days untill card expires at the time of transaction
trans_df = transactions.expiry_days(trans_df, profiles_df)


In [5]:
# labels
labels_df["month"] = transactions.get_year_month(labels_df.datetime)

## <span style="color:#ff5f27;"> Great Expectations </span>


In [7]:
ge_trans_df = ge.from_pandas(trans_df)

expectation_suite_trans = ge_trans_df.get_expectation_suite()
expectation_suite_trans.expectation_suite_name = "transaction_suite"
print(expectation_suite_trans)


2023-10-21 20:16:14,922 INFO: 	0 expectation(s) included in expectation_suite.
{
  "expectations": [],
  "data_asset_type": "Dataset",
  "meta": {
    "great_expectations_version": "0.15.12"
  },
  "expectation_suite_name": "transaction_suite",
  "ge_cloud_id": null
}


In [8]:
# Check for errors which could lead to technical issues
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_unique",
        kwargs={"column":"tid", "result_format":"COMPLETE"}
    )
)

# Assess data correctness
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"amount",
            "min_value": 0
        }
    )
)

expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"age_at_transaction",
            "min_value": 17,
            "max_value": 101
        }
    )
)

#monitor data statistics and quality, e.g number of null values
expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
      expectation_type="expect_column_mean_to_be_between",
      kwargs={"column":"age_at_transaction", "min_value": 55, "max_value": 70}
    )
)

expectation_suite_trans.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column":"category"}
    )
)

ge_trans_df = ge.from_pandas(trans_df, expectation_suite=expectation_suite_trans)

validation_report_trans = ge_trans_df.validate()

2023-10-21 20:16:19,686 INFO: 	5 expectation(s) included in expectation_suite.


In [9]:
validation_report_trans

{
  "evaluation_parameters": {},
  "meta": {
    "great_expectations_version": "0.15.12",
    "expectation_suite_name": "transaction_suite",
    "run_id": {
      "run_time": "2023-10-21T20:16:19.686401+00:00",
      "run_name": null
    },
    "batch_kwargs": {
      "ge_batch_id": "b256bdc6-704e-11ee-b70f-06e8b8b59184"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20231021T201619.686326Z",
    "expectation_suite_meta": {
      "great_expectations_version": "0.15.12"
    }
  },
  "results": [
    {
      "result": {
        "element_count": 71322,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_list": [],
        "partial_unexpected_index_list": [],
        "partial_unexpected_counts": [],
        "unexpected_list": [],
        "unexpected_index_list": []
  

In [10]:
#Generate an Expectation Suite from your data using Great Expectations Profiler
#For complex DataFrames, Great Expectations offers a profiler which generates a basic expectation suite tailored to your data. You can then use this suite as you would any other expectation suite with Hopsworks.

# ignore deprecation warnings
expectation_suite_profiled, validation_report = ge_trans_df.profile(profiler=ge.profile.BasicSuiteBuilderProfiler)

print(f"The suite contains {len(expectation_suite_profiled['expectations'])} expectations for {len(trans_df.columns.values)} columns. See sample below\n" + ge_trans_df.get_expectation_suite().__repr__()[:455])

#Note that you cannot register the report generated by the profiler as the suite was not registered with Hopsworks before the validation was run.



Profiling Columns:   0%|          | 0/14 [00:00<?, ?it/s, tid]



2023-10-21 20:17:45,824 INFO: 	76 expectation(s) included in expectation_suite.




2023-10-21 20:17:46,259 INFO: 	68 expectation(s) included in expectation_suite. Omitting 8 expectation(s) that failed when last run; set discard_failed_expectations=False to include them. result_format settings filtered.
The suite contains 76 expectations for 14 columns. See sample below
{
  "expectations": [
    {
      "kwargs": {
        "column": "tid"
      },
      "expectation_type": "expect_column_values_to_be_unique",
      "meta": {}
    },
    {
      "kwargs": {
        "column": "amount",
        "min_value": -0.99,
        "max_value": 1.01
      },
      "expectation_type": "expect_column_min_to_be_between",
      "meta": {}
    },
    {
      "kwargs": {
        "column": "age_at_transaction",
        "min_value": 16.5


## <span style="color:#ff5f27;"> 📡 Connecting to Hopsworks Feature Store </span>

### <span style="color:#ff5f27;"> 🪄 Creating Feature Groups </span>

A [feature group](https://docs.hopsworks.ai/3.0/concepts/fs/feature_group/fg_overview/) can be seen as a collection of conceptually related features. In this case, you will create a feature group for the transaction data and a feature group for the windowed aggregations on the transaction data. Both will have `cc_num` as primary key, which will allow you to join them when creating a dataset in the next tutorial.

Feature groups can also be used to define a namespace for features. For instance, in a real-life setting you would likely want to experiment with different window lengths. In that case, you can create feature groups with identical schema for each window length. 

Before you can create a feature group you need to connect to Hopsworks feature store.

In [11]:
import hopsworks

project = hopsworks.login()
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119
Connected. Call `.close()` to terminate connection gracefully.


To create a feature group you need to give it a name and specify a primary key. It is also good to provide a description of the contents of the feature group and a version number, if it is not defined it will automatically be incremented to `1`.

In [13]:
# get or create feature group
trans_fg = fs.get_or_create_feature_group(
    name="transactions",
    version=1,
    description="Transaction data",
    primary_key=['cc_num'],
    event_time='datetime',
    partition_key=['month'],
    stream=True,
    online_enabled=True
)

In [14]:
# 4.Setup Automatic Validation On Insert and upload a DataFrame
# Register the expectation suite corresponding to a Feature Group with the backend

# The "ALWAYS" ingestion policy inserts data even when validation fails, 
# ideal to avoid data loss and rapid prototyping

trans_fg.save_expectation_suite(expectation_suite_trans, validation_ingestion_policy="ALWAYS")

#Once the suite is registered in the backend, data validation will run on every insert without additional boilerplate. The suite is retrieved from the backend, used to validate the DataFrame and the resulting validation report uploaded. Depending on the ingestion policy and validation success, data are subsequently inserted in the Feature Group. The example below illustrate the "ALWAYS" use case where insertion is performed despite a validation failure.

# materialize feature data in to the feature group
trans_fg.insert(trans_df)

Feature Group created successfully, explore it at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/fs/67/fg/13
2023-10-21 20:19:04,029 INFO: 	5 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/fs/67/fg/13


Uploading Dataframe: 0.00% |          | Rows 0/71322 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: transactions_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/jobs/named/transactions_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fb9dc723ee0>,
 {
   "evaluation_parameters": {},
   "meta": {
     "great_expectations_version": "0.15.12",
     "expectation_suite_name": "transaction_suite",
     "run_id": {
       "run_time": "2023-10-21T20:19:04.029520+00:00",
       "run_name": null
     },
     "batch_kwargs": {
       "ge_batch_id": "144b9632-704f-11ee-b70f-06e8b8b59184"
     },
     "batch_markers": {},
     "batch_parameters": {},
     "validation_time": "20231021T201904.029435Z",
     "expectation_suite_meta": {
       "great_expectations_version": "0.15.12"
     }
   },
   "results": [
     {
       "result": {
         "element_count": 71322,
         "unexpected_count": 0,
         "unexpected_percent": 0.0,
         "unexpected_percent_total": 0.0,
         "partial_unexpected_list": []
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "meta": {
         "ingestionRes

Here you have also set `online_enabled=True`, which enables low latency access to the data. A full list of arguments can be found in the [documentation](https://docs.hopsworks.ai/feature-store-api/latest/generated/api/feature_store_api/#create_feature_group).

In [29]:
feature_descriptions = [
    {"name": "tid", "description": "Transaction id"},
    {"name": "datetime", "description": "Transaction time"},
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "category", "description": "Catetegory of item purchased"},
    {"name": "amount", "description": "Dollar amount of the transaction"},
    {"name": "latitude", "description": "latitude of the place where merchant or ATM is located"},
    {"name": "longitude", "description": "longitude of the place where merchant or ATM is located"},
    {"name": "city", "description": "latitude of the merchant or ATM"},
    {"name": "country", "description": "Country in which the transaction was made"},
    {"name": "loc_delta_t_minus_1", "description": "Location of previous transaction"},
    {"name": "time_delta_t_minus_1", "description": "Time of previous transaction"},    
    {"name": "month", "description": "Month of the transaction"},    
    {"name": "age_at_transaction", "description": "Age of card holder at the time of transaction"},
    {"name": "days_until_card_expires", "description": "Days left till card expires"},
]

for desc in feature_descriptions: 
    trans_fg.update_feature_description(desc["name"], desc["description"])

In [28]:
trans_df

Unnamed: 0,tid,datetime,cc_num,category,amount,latitude,longitude,city,country,loc_delta_t_minus_1,time_delta_t_minus_1,month,age_at_transaction,days_until_card_expires
0,20bdda6375fa1ffa28b680fa2e819b27,2023-04-24 20:18:24,4175202677382064,Grocery,83.99,39.785040,-85.769420,Greenfield,US,0.791870,1.675000,2023-4,45.084693,798.153889
1,1a84155d6ef055273a5e31574faadf39,2023-04-24 20:26:51,4972349737471042,Grocery,68.39,35.614520,-88.813950,Jackson,US,1.807929,4.414873,2023-4,49.996515,1255.148021
2,fb60a6ef0487323a777794ee73837320,2023-04-24 20:38:12,4343568566831578,Grocery,67.95,40.659950,-111.996330,Kearns,US,1.430287,1.239641,2023-4,85.019843,1528.140139
3,f44baa37d3df75acbd83f6ff0b98a5c9,2023-04-24 20:38:14,4205094877256105,Grocery,67.10,47.762320,-122.205400,Bothell,US,2.150129,3.378657,2023-4,51.830934,433.140116
4,d0dacdca2622701481e232cb598e4b0d,2023-04-24 20:43:20,4176332408257688,Clothing,65.18,30.166880,-96.397740,Brenham,US,1.177476,1.976528,2023-4,22.513435,1528.136574
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
71317,877c7f95fcf37a58e6fd9d9a04171b5d,2023-07-31 20:14:55,4169557131607200,Cash Withdrawal,704.83,39.457010,-77.980960,Martinsburg,US,0.004960,-4.000000,2023-7,90.818685,823.156308
71318,94ba051c30e554eebccfc57d08682386,2023-07-27 20:14:55,4169557131607200,Cash Withdrawal,50.53,39.461648,-77.990295,Martinsburg,US,0.005862,-4.000000,2023-7,90.807734,827.156308
71319,cfc5724caa2de3f4190974ef3fcbed10,2023-07-23 20:14:55,4169557131607200,Cash Withdrawal,61.86,39.455896,-77.984271,Martinsburg,US,0.001625,-4.000000,2023-7,90.796782,831.156308
71320,48a127777c7b275f1eebfa43ad66551b,2023-07-19 20:14:55,4169557131607200,Cash Withdrawal,412.33,39.455715,-77.975533,Martinsburg,US,0.002846,-4.000000,2023-7,90.785830,835.156308


In [16]:
# get or create feature group
labels_fg = fs.get_or_create_feature_group(
    name="fraud_labels",
    version=1,
    description="Transaction data",
    primary_key=['cc_num'],
    event_time='datetime',
    partition_key=['month'],
    stream=True,
    online_enabled=True
)

In [17]:
# materialize feature data in to the feature group
labels_fg.insert(labels_df)

Feature Group created successfully, explore it at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/fs/67/fg/14


Uploading Dataframe: 0.00% |          | Rows 0/71323 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: fraud_labels_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/jobs/named/fraud_labels_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fb9dc73cfd0>, None)

You can move on and do the same thing for the profile and label feature groups.

In [22]:
# get or create feature group
profile_fg = fs.get_or_create_feature_group(
    name="profile",
    version=1,
    description="Credit card holder demographic data",
    primary_key=["cc_num"],
    partition_key=["cc_provider"],
    stream=True,
    online_enabled=True
)


In [23]:
# materialize feature data in to the feature group
profile_fg.insert(profiles_df)

Feature Group created successfully, explore it at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/fs/67/fg/15


Uploading Dataframe: 0.00% |          | Rows 0/1000 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: profile_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/jobs/named/profile_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fb9d99c97b0>, None)

In [25]:
feature_descriptions = [
    {"name": "cc_num", "description": "Number of the credit card performing the transaction"},
    {"name": "cc_provider", "description": "Company name that issued the card"},
    {"name": "cc_type", "description": "Type of the card, debit or credit"},
    {"name": "cc_expiration_date", "description": "Date when this card expires"},    
    {"name": "birthdate", "description": "Birth date"},
    {"name": "country_of_residence", "description": "Country of residence of the card holder"},    
]

for desc in feature_descriptions: 
    profile_fg.update_feature_description(desc["name"], desc["description"])

Click on the hyperlink printed in the cell output above to inspect your feature group in the UI.

### <span style="color:#ff5f27;"> 👓  Exploration</span>
In the Hopsworks feature store, the metadata allows for multiple levels of explorations and review. Here you will explore a few of those capacities. 

### <span style="color:#ff5f27;"> 🔎 Search</span>
Using the search function in the UI, you can query any aspect of the feature groups, feature_view and training data that was previously created.

### <span style="color:#ff5f27;"> 📊 Statistics</span>
You can also enable statistics in one or all the feature groups.

In [26]:
trans_fg = fs.get_or_create_feature_group("transactions", version=1)
trans_fg.statistics_config = {
    "enabled": True,
    "histograms": True,
    "correlations": True
}

trans_fg.update_statistics_config()
trans_fg.compute_statistics()

Statistics Job started successfully, you can follow the progress at 
https://63fce2f0-6ea9-11ee-94f8-f759e1f32264.cloud.hopsworks.ai/p/119/jobs/named/transactions_1_compute_stats_21102023203443/executions


### ⛓️ <b> Lineage </b> 
In all the feature groups and feature view you can look at the relation between each abstractions; what feature group created which training dataset and that is used in which model.
This allows for a clear undestanding of the pipeline in relation to each element. 