# Part 1: Data ingestion


```
This demo works with the online feature store, which is currently not part of the Open Source default deployment.
```
This demo showcases financial fraud prevention using the MLRun feature store to define complex features that help identify 
fraud. Fraud prevention specifically is a challenge because it requires processing raw transaction and events in real-time, and 
being able to quickly respond and block transactions before they occur.

To address this, you create a development pipeline and a production pipeline. Both pipelines share the same feature 
engineering and model code, but serve data very differently. Furthermore, you automate the data and model monitoring 
process, identify drift and trigger retraining in a CI/CD pipeline. This process is described in the diagram below:

![Feature store demo diagram - fraud prevention](images/feature_store_demo_diagram.png)

By the end of this tutorial you’ll learn how to:

- Create an ingestion pipeline for each data source.
- Define preprocessing, aggregation and validation of the pipeline.
- Run the pipeline locally within the notebook.
- Launch a real-time function to ingest live data.
- Schedule a cron to run the task when needed.

The raw data is described as follows:

| TRANSACTIONS                                                                    || &#x2551; |USER EVENTS                                                                           || 
|-----------------|----------------------------------------------------------------|----------|-----------------|----------------------------------------------------------------|
| **age**         | age group value 0-6. Some values are marked as U for unknown   | &#x2551; | **source**      | The party/entity related to the event                          |
| **gender**      | A character to define the gender                               | &#x2551; | **event**       | event, such as login or password change                        |
| **zipcodeOri**  | ZIP code of the person originating the transaction             | &#x2551; | **timestamp**   | The date and time of the event                                 |
| **zipMerchant** | ZIP code of the merchant receiving the transaction             | &#x2551; |                 |                                                                |
| **category**    | category of the transaction (e.g., transportation, food, etc.) | &#x2551; |                 |                                                                |
| **amount**      | the total amount of the transaction                            | &#x2551; |                 |                                                                |
| **fraud**       | whether the transaction is fraudulent                          | &#x2551; |                 |                                                                |
| **timestamp**   | the date and time in which the transaction took place          | &#x2551; |                 |                                                                |
| **source**      | the ID of the party/entity performing the transaction          | &#x2551; |                 |                                                                |
| **target**      | the ID of the party/entity receiving the transaction           | &#x2551; |                 |                                                                |
| **device**      | the device ID used to perform the transaction                  | &#x2551; |                 |                                                                |

This notebook introduces how to **Ingest** different data sources to the **Feature Store**.

The following FeatureSets are created:
- **Transactions**: Monetary transactions between a source and a target.
- **Events**: Account events such as account login or a password change.
- **Label**: Fraud label for the data.

## Step 1 - Fetch, process and ingest the datasets

## 1.1 - Transactions

### Transactions

In [1]:
import pandas as pd
from src.date_adjust import adjust_data_timespan

# Fetch the transactions dataset from the server
transactions_data = pd.read_csv('https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/data.csv', parse_dates=['timestamp'])

# use only first 10k
transactions_data = transactions_data.sort_values(by='source', axis=0)[:10000]

# Adjust the samples timestamp for the past 2 days
transactions_data = adjust_data_timespan(transactions_data, new_period='2d')

# Preview
transactions_data.head(3)

Unnamed: 0,step,age,gender,zipcodeOri,zipMerchant,category,amount,fraud,timestamp,source,target,device
274633,91,5,F,28007,28007,es_transportation,26.92,0,2023-06-17 12:50:29.363804000,C1022153336,M1823072687,33832bb8607545df97632a7ab02d69c4
286902,94,2,M,28007,28007,es_transportation,48.22,0,2023-06-17 12:50:47.657429913,C1006176917,M348934600,fadd829c49e74ffa86c8da3be75ada53
416998,131,3,M,28007,28007,es_transportation,17.56,0,2023-06-17 12:50:52.764599939,C1010936270,M348934600,58d0422a50bc40c89d2b4977b2f1beea


In [2]:
transactions_data.columns

Index(['step', 'age', 'gender', 'zipcodeOri', 'zipMerchant', 'category',
       'amount', 'fraud', 'timestamp', 'source', 'target', 'device'],
      dtype='object')

### Transactions - create a feature set and preprocessing pipeline
Create the feature set (data pipeline) definition for the **credit transaction processing** that describes the 
offline/online data transformations and aggregations.<br>
The feature store automatically adds an offline `parquet` target and an online `NoSQL` target by using `set_targets()`.

The data pipeline consists of:

* **Extracting** the data components (hour, day of week)
* **Mapping** the age values
* **One hot encoding** for the transaction category and the gender
* **Aggregating** the amount (avg, sum, count, max over 2/12/24 hour time windows)
* **Aggregating** the transactions per category (over 14 days time windows)
* **Writing** the results to **offline** (Parquet) and **online** (NoSQL) targets 

In [3]:
# Define and add value mapping
main_categories = ["es_transportation", "es_health", "es_otherservices",
       "es_food", "es_hotelservices", "es_barsandrestaurants",
       "es_tech", "es_sportsandtoys", "es_wellnessandbeauty",
       "es_hyper", "es_fashion", "es_home", "es_contents",
       "es_travel", "es_leisure"]
transactions_data_p = transactions_data

# transactions_data_p.set_index(['source'], inplace=True)
transactions_data_p['timestamp_day_of_week'] = transactions_data_p['timestamp'].dt.weekday
transactions_data_p['timestamp_hour'] = transactions_data_p['timestamp'].dt.hour
transactions_data_p["age_mapped"] = transactions_data_p["age"].map(
    lambda x: {'U': '0'}.get(x, x)
)

transactions_data_p = pd.get_dummies(transactions_data_p, columns=['category', 'gender'])
transactions_data_for_agg = transactions_data_p.set_index(['timestamp'],)


windows=['2H', '12H', '24H']
operation = ['mean','sum', 'count','max']
for window in windows:
    for op in operation:
        transactions_data_p[f'amount_{op}_{window}'] = transactions_data_for_agg.groupby(['source', pd.Grouper(freq=window)])['amount'].transform(op).values


for category in main_categories:
    transactions_data_p[f'{category}_sum_14D'] = transactions_data_for_agg.groupby(['source', pd.Grouper(freq='14D')])[f'category_{category}'].transform('sum').values

transactions_data_p.set_index(['source'], inplace=True)
transactions_data_p.head()

Unnamed: 0_level_0,step,age,zipcodeOri,zipMerchant,amount,fraud,timestamp,target,device,timestamp_day_of_week,...,es_barsandrestaurants_sum_14D,es_tech_sum_14D,es_sportsandtoys_sum_14D,es_wellnessandbeauty_sum_14D,es_hyper_sum_14D,es_fashion_sum_14D,es_home_sum_14D,es_contents_sum_14D,es_travel_sum_14D,es_leisure_sum_14D
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
C1022153336,91,5,28007,28007,26.92,0,2023-06-17 12:50:29.363804000,M1823072687,33832bb8607545df97632a7ab02d69c4,5,...,1,1,1,1,0,1,0,0,0,0
C1006176917,94,2,28007,28007,48.22,0,2023-06-17 12:50:47.657429913,M348934600,fadd829c49e74ffa86c8da3be75ada53,5,...,4,0,1,1,0,2,0,0,0,0
C1010936270,131,3,28007,28007,17.56,0,2023-06-17 12:50:52.764599939,M348934600,58d0422a50bc40c89d2b4977b2f1beea,5,...,4,0,0,6,6,0,0,0,0,0
C1033736586,108,4,28007,28007,4.5,0,2023-06-17 12:51:05.057351118,M1823072687,30b269ae55984e5584f1dd5f642ac1a3,5,...,3,2,0,1,3,0,2,0,1,0
C1019071188,72,4,28007,28007,1.83,0,2023-06-17 12:51:38.946433001,M348934600,97bee3503a984f59aa6139b59f933c0b,5,...,1,0,0,0,1,4,0,1,1,0


## 1.2 - User events

### User events - fetching

In [4]:
# Fetch the user_events dataset from the server
user_events_data = pd.read_csv('https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/events.csv', 
                               index_col=0, quotechar="\'", parse_dates=['timestamp'])

# Adjust to the last 2 days to see the latest aggregations in the online feature vectors
user_events_data = adjust_data_timespan(user_events_data, new_period='2d')

# Preview
user_events_data.head(3)

Unnamed: 0,source,event,timestamp
0,C1974668487,details_change,2023-06-18 21:57:33.072706086
1,C1973547259,login,2023-06-19 00:59:07.329846508
2,C515668508,login,2023-06-18 21:32:13.611023302


In [5]:
user_events_data_p = user_events_data
user_events_data_p = pd.get_dummies(user_events_data_p, columns=['event'])
user_events_data_p.set_index(['source'], inplace=True)
user_events_data_p.head()

Unnamed: 0_level_0,timestamp,event_details_change,event_login,event_password_change
source,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
C1974668487,2023-06-18 21:57:33.072706086,1,0,0
C1973547259,2023-06-19 00:59:07.329846508,0,1,0
C515668508,2023-06-18 21:32:13.611023302,0,1,0
C1721541488,2023-06-18 23:39:43.720240221,1,0,0
C394979408,2023-06-19 01:17:31.767663667,0,0,1


## Step 2 - Create a labels data set for model training

### Label set - create a feature set
This feature set contains the label for the fraud demo. It is ingested directly to the default targets without any changes.

In [6]:
def create_labels(df):
    labels = df[['fraud','timestamp']].copy()
    labels = labels.rename(columns={"fraud": "label"})
    labels['timestamp'] = labels['timestamp'].astype("datetime64[ms]")
    labels['label'] = labels['label'].astype(int)
    return labels

In [7]:
labels_set = create_labels(transactions_data_p)
labels_set.head()

Unnamed: 0_level_0,label,timestamp
source,Unnamed: 1_level_1,Unnamed: 2_level_1
C1022153336,0,2023-06-17 12:50:29.363
C1006176917,0,2023-06-17 12:50:47.657
C1010936270,0,2023-06-17 12:50:52.764
C1033736586,0,2023-06-17 12:51:05.057
C1019071188,0,2023-06-17 12:51:38.946


## Train

In [46]:
features = ['amount_max_2H', 
            'amount_sum_2H', 
            'amount_count_2H',
            'amount_mean_2H', 
            'amount_max_12H', 
            'amount_sum_12H',
            'amount_count_12H', 
            'amount_mean_12H', 
            'amount_max_24H',
            'amount_sum_24H', 
            'amount_count_24H', 
            'amount_mean_24H',
            'es_transportation_sum_14D', 
            'es_health_sum_14D',
            'es_otherservices_sum_14D', 
            'es_food_sum_14D',
            'es_hotelservices_sum_14D', 
            'es_barsandrestaurants_sum_14D',
            'es_tech_sum_14D', 
            'es_sportsandtoys_sum_14D',
            'es_wellnessandbeauty_sum_14D', 
            'es_hyper_sum_14D',
            'es_fashion_sum_14D', 
            'es_home_sum_14D', 
            'es_travel_sum_14D', 
            'es_leisure_sum_14D',
            'gender_F',
            'gender_M',
            'step', 
            'amount', 
            'timestamp_hour',
            'timestamp_day_of_week',
            'timestamp']

transactions_data_p = transactions_data_p[features]
transactions_data_p.sort_values(by='timestamp', inplace=True)
user_events_data_p.sort_values(by='timestamp', inplace=True)


merged_df = pd.merge_asof(
    transactions_data_p,
    user_events_data_p,
    on='timestamp',
    by='source',
)

data_for_train = pd.merge_asof(
    merged_df,
    labels_set,
    on='timestamp',
    by='source'
).drop(columns=['source', 'timestamp']).dropna()
data_for_train


Unnamed: 0,amount_max_2H,amount_sum_2H,amount_count_2H,amount_mean_2H,amount_max_12H,amount_sum_12H,amount_count_12H,amount_mean_12H,amount_max_24H,amount_sum_24H,...,gender_F,gender_M,step,amount,timestamp_hour,timestamp_day_of_week,event_details_change,event_login,event_password_change,label
14,74.89,205.50,5,41.100000,167.98,1212.11,36,33.669722,167.98,1212.11,...,0,1,55,74.89,12,5,0.0,1.0,0.0,0
85,21.73,21.73,1,21.730000,4241.70,4439.54,9,493.282222,4241.70,4439.54,...,0,1,145,21.73,13,5,0.0,1.0,0.0,0
86,29.29,121.46,6,20.243333,207.93,1317.79,40,32.944750,207.93,1317.79,...,0,1,29,3.08,13,5,0.0,0.0,1.0,0
89,29.29,121.46,6,20.243333,207.93,1317.79,40,32.944750,207.93,1317.79,...,0,1,139,29.29,13,5,0.0,0.0,1.0,0
96,37.30,55.76,2,27.880000,380.42,1081.83,31,34.897742,380.42,1081.83,...,1,0,23,18.46,13,5,0.0,0.0,1.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,54.55,82.91,3,27.636667,54.55,82.91,3,27.636667,70.47,916.66,...,1,0,116,26.00,12,0,0.0,1.0,0.0,0
9996,31.14,31.14,1,31.140000,31.14,31.14,1,31.140000,119.50,150.64,...,0,1,92,31.14,12,0,0.0,1.0,0.0,0
9997,218.48,283.71,3,94.570000,218.48,283.71,3,94.570000,218.48,1184.56,...,0,1,128,218.48,12,0,0.0,1.0,0.0,0
9998,34.93,94.44,4,23.610000,34.93,94.44,4,23.610000,79.16,999.34,...,0,1,95,34.93,12,0,0.0,0.0,1.0,0


In [47]:
from sklearn.model_selection import train_test_split
lable = data_for_train['label']
data_for_train.drop(columns=['label'], inplace=True)

X_train, X_test, y_train, y_test = train_test_split(data_for_train, lable, test_size=0.2, random_state=42)

In [48]:
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
grid_search = {'bootstrap': [True, False],
               'max_depth': [10, 30, 50, 100,],
               'max_features': ['auto', 'sqrt'],
               'min_samples_leaf': [1, 2, 4],
               'min_samples_split': [2, 5, 10],
               'n_estimators': [50, 100, 500]}

rf = RandomForestClassifier()
rfc = RandomizedSearchCV(estimator = rf, param_distributions = grid_search, n_iter = 100, cv = 3, verbose=2, random_state=42, n_jobs = -1)
rfc.fit(X_train, y_train)

Fitting 3 folds for each of 100 candidates, totalling 300 fits


  warn(


In [49]:
rfc.best_estimator_

In [51]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


# Make predictions on the test set
y_pred = rfc.best_estimator_.predict(X_test)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

# Print the evaluation metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)

Accuracy: 0.9947643979057592
Precision: 0.8
Recall: 0.5
F1 Score: 0.6153846153846154
