[![Open In
Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/alibaba/feathub/blob/master/docs/examples/fraud_detection.ipynb)

# Fraud Detection

This notebook illustrates the use of FeatHub to create a model that predicts the
fraud status of transactions based on the user account data and trasaction data.
The main focus of this notebook is to depict:

- How a feature designer can define heterogenious features from different data
  sources (user account data and transaction data) with different keys by using
  FeatHub, and
- How a feature consumer can extract features using multiple FeatureView.

The sample fraud transaction datasets that are used in the notebook can be found
here: https://github.com/microsoft/r-server-fraud-detection.

Please feel free to view this example interactively with Colab by clicking the
badge at the top left corner of this notebook.

## Install dependencies

This example has been verified in Python 3.7 with the following libraries.

- feathub-nightly[spark]
- plotly
- matplotlib

Execute the following cells to install these dependencies. **If the notebook is
executed in Colab, restart the runtime after the following cells are executed,
in order to make sure Python 3.7 is correctly configured to execute the Python
cells.**

In [None]:
%%bash
python_version=`python -V`
if [[ $python_version != *"3.7"* ]]; then
    # install python 3.7
    sudo apt-get update -y
    sudo apt-get install python3.7 python3-pip python3.7-distutils python3-apt

    # change alternatives
    sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.7 0
    sudo update-alternatives --set python3 /usr/bin/python3.7
fi

In [None]:
%%bash
feathub_dependencies=`pip list | grep feathub`
if [[ -z "$feathub_dependencies" ]]; then
    pip install "feathub-nightly[flink]"
fi

pip install plotly matplotlib

## Import Python dependencies

In [None]:
import os
from datetime import timedelta
from typing import OrderedDict
from urllib.parse import urlparse

import numpy as np
import pandas as pd
import requests

from feathub.common import types
from feathub.feathub_client import FeathubClient
from feathub.feature_tables.sources.file_system_source import FileSystemSource
from feathub.feature_views.derived_feature_view import DerivedFeatureView
from feathub.feature_views.feature import Feature
from feathub.feature_views.transforms.over_window_transform import OverWindowTransform
from feathub.table.schema import Schema

## Initliaze FeatHub client

In [None]:
client = FeathubClient(
    props={
        "processor": {
            "type": "spark",
            "spark": {
                "master": "local[1]",
            },
        },
        "registry": {
            "type": "local",
            "local": {
                "namespace": "default",
            },
        },
        "feature_service": {
            "type": "local",
            "local": {},
        },
    }
)

## Define features

### Download and preprocess source data

We prepare the fraud detection dataset as follows:

1. Download Account info data, fraud transactions data, and untagged
   transactions data.
2. Tag transaction data based on the fraud transactions data.
   1. Aggregate the Fraud table on the account level, creating a start and end
      datetime.
   2. Join this data with the untagged data.
   3. Tag the data: is_fraud = 0 for non fraud, 1 for fraud.
3. Save the result data files to local filesystem so that FeatHub jobs can
   consume.

To learn more about the fraud detection scenario as well as the dataset source
we use and the method we tag the transactions, please see
[here](https://microsoft.github.io/r-server-fraud-detection/data-scientist.html).

In [None]:
def maybe_download_file(url):
    parsed_url = urlparse(url)
    file_name = os.path.basename(parsed_url.path)

    if os.path.isfile(file_name):
        return

    r = requests.get(url)
    open(file_name, "wb").write(r.content)


file_path = "https://raw.github.com/microsoft/r-server-fraud-detection/master/Data/"
maybe_download_file(file_path + "Account_Info.csv")
maybe_download_file(file_path + "Fraud_Transactions.csv")
maybe_download_file(file_path + "Untagged_Transactions.csv")

In [None]:
# Load datasets
fraud_df = pd.read_csv("Fraud_Transactions.csv")
obs_df = pd.read_csv("Untagged_Transactions.csv")

# Combine transactionDate and transactionTime into one column. E.g. "20130903", "013641" -> "20130903 013641"
fraud_df["timestamp"] = (
    fraud_df["transactionDate"].astype(str)
    + " "
    + fraud_df["transactionTime"].astype(str).str.zfill(6)
)
obs_df["timestamp"] = (
    obs_df["transactionDate"].astype(str)
    + " "
    + obs_df["transactionTime"].astype(str).str.zfill(6)
)

In this step, we compute the timestamp range that the frauds were happened by
referencing the transaction-level fraud data. We create the labels `is_fraud` to
the untagged transaction data based on that.

In [None]:
# For each user in the fraud transaction data, get the timestamp range that the fraud transactions were happened.
fraud_labels_df = fraud_df.groupby("accountID").agg({"timestamp": ["min", "max"]})
fraud_labels_df.columns = ["_".join(col) for col in fraud_labels_df.columns]
fraud_labels_df.head()

In [None]:
# Combine fraud and untagged transaction data to generate the tagged transaction data.
transactions_df = pd.concat([fraud_df, obs_df], ignore_index=True).merge(
    fraud_labels_df,
    on="accountID",
    how="outer",
)

# Data cleaning
transactions_df.dropna(
    subset=[
        "accountID",
        "transactionID",
        "transactionAmount",
        "localHour",
        "timestamp",
    ],
    inplace=True,
)
transactions_df.sort_values("timestamp", inplace=True)
transactions_df.drop_duplicates(inplace=True)

# is_fraud = 0 if the transaction is not fraud. Otherwise (if it is a fraud), is_fraud = 1.
transactions_df["is_fraud"] = np.logical_and(
    transactions_df["timestamp_min"] <= transactions_df["timestamp"],
    transactions_df["timestamp"] <= transactions_df["timestamp_max"],
).astype(int)

transactions_df.head()

In [None]:
transactions_df["is_fraud"].value_counts()

In [None]:
transactions_df.describe().T

In [None]:
# trim the data in transactions_df to accelerate calculation.
sampled_df = transactions_df.sample(n=10000, random_state=1)

transactions_df = pd.concat(
    [
        sampled_df[sampled_df["accountID"] != "A1055520452832600"],
        transactions_df[transactions_df["accountID"] == "A1055520452832600"],
    ]
)

In [None]:
transactions_df.to_csv("transactions.csv", index=False, header=False)

In [None]:
account_df = pd.read_csv("Account_Info.csv")
account_df["timestamp"] = (
    account_df["transactionDate"].astype(str)
    + " "
    + account_df["transactionTime"].astype(str).str.zfill(6)
)
account_df.to_csv("accounts.csv", index=False, header=False)
account_df.head()

In [None]:
account_df.describe().T

Now, we can define following features:

- Account features: Account-level features that will be joined to observation
  data on accountID
- Transaction features: The features that will be joined to observation data on
  transactionID
- Transaction aggregated features: The features aggregated by accountID
- Derived features: The features derived from other features

### Define account features

In [None]:
account_schema = (
    Schema.new_builder()
    .column("accountID", types.String)
    .column("transactionDate", types.String)
    .column("transactionTime", types.String)
    .column("accountOwnerName", types.String)
    .column("accountAddress", types.String)
    .column("accountPostalCode", types.String)
    .column("accountCity", types.String)
    .column("accountState", types.String)
    .column("accountCountry", types.String)
    .column("accountOpenDate", types.String)
    .column("accountAge", types.Float64)
    .column("isUserRegistered", types.Bool)
    .column("paymentInstrumentAgeInAccount", types.String)
    .column("numPaymentRejects1dPerUser", types.Float64)
    .column("timestamp", types.String)
    .build()
)

account_source = FileSystemSource(
    name="account_source",
    path="accounts.csv",
    data_format="csv",
    schema=account_schema,
    keys=["accountID"],
    timestamp_field="timestamp",
    timestamp_format="%Y%m%d %H%M%S",
)

Here, we use `accountCountry`, `isUserRegistered`, `numPaymentRejects1dPerUser`,
and `accountAge` as the account features.

In [None]:
account_feature_view_1 = DerivedFeatureView(
    name="account_feature_view_1",
    source=account_source,
    features=[
        Feature(
            name=feature_name,
            transform=feature_name,
            keys=["accountID"],
        )
        for feature_name in [
            "accountID",
            "accountCountry",
            "isUserRegistered",
            "numPaymentRejects1dPerUser",
            "accountAge",
        ]
    ],
    keep_source_fields=True,
    filter_expr="accountID IS NOT NULL",
)

account_feature_view_2 = DerivedFeatureView(
    name="account_feature_view_2",
    source=account_feature_view_1,
    features=[
        Feature(
            name="account_country_code",
            keys=["accountID"],
            transform="accountCountry",
        ),
        Feature(
            name="is_user_registered",
            keys=["accountID"],
            transform="isUserRegistered",
        ),
        Feature(
            name="num_payment_rejects_1d_per_user",
            keys=["accountID"],
            transform="numPaymentRejects1dPerUser",
        ),
        Feature(
            name="account_age",
            keys=["accountID"],
            transform="accountAge",
        ),
    ],
    keep_source_fields=True,
)

### Define transaction features

We already checked the transaction dataset when we tagged the fraud label
is_fraud. So, let's jump to defining features.

In [None]:
transaction_schema = (
    Schema.new_builder()
    .column("transactionID", types.String)
    .column("accountID", types.String)
    .column("transactionAmount", types.Float32)
    .column("transactionCurrencyCode", types.String)
    .column("transactionDate", types.String)
    .column("transactionTime", types.Float32)
    .column("localHour", types.Float32)
    .column("transactionDeviceId", types.String)
    .column("transactionIPaddress", types.String)
    .column("timestamp", types.String)
    .column("transactionAmountUSD", types.String)
    .column("transactionCurrencyConversionRate", types.String)
    .column("transactionScenario", types.String)
    .column("transactionType", types.String)
    .column("transactionMethod", types.String)
    .column("transactionDeviceType", types.String)
    .column("ipState", types.String)
    .column("ipPostcode", types.String)
    .column("ipCountryCode", types.String)
    .column("isProxyIP", types.Bool)
    .column("browserType", types.String)
    .column("browserLanguage", types.String)
    .column("paymentInstrumentType", types.String)
    .column("cardType", types.String)
    .column("cardNumberInputMethod", types.String)
    .column("paymentInstrumentID", types.String)
    .column("paymentBillingAddress", types.String)
    .column("paymentBillingPostalCode", types.String)
    .column("paymentBillingState", types.String)
    .column("paymentBillingCountryCode", types.String)
    .column("paymentBillingName", types.String)
    .column("shippingAddress", types.String)
    .column("shippingPostalCode", types.String)
    .column("shippingCity", types.String)
    .column("shippingState", types.String)
    .column("shippingCountry", types.String)
    .column("cvvVerifyResult", types.String)
    .column("responseCode", types.String)
    .column("digitalItemCount", types.String)
    .column("physicalItemCount", types.String)
    .column("purchaseProductType", types.String)
    .column("timestamp_min", types.String)
    .column("timestamp_max", types.String)
    .column("is_fraud", types.String)
    .build()
)

transaction_source = FileSystemSource(
    name="transaction_source",
    path="transactions.csv",
    data_format="csv",
    schema=transaction_schema,
    timestamp_field="timestamp",
    timestamp_format="%Y%m%d %H%M%S",
)

### Define transaction aggregation features

In [None]:
transaction_feature_view = DerivedFeatureView(
    name="transaction_feature_view",
    source=transaction_source,
    features=[
        Feature(
            name="transaction_amount",
            keys=["transactionID"],
            transform="transactionAmount",
        ),
        Feature(
            name="transaction_country_code",
            keys=["transactionID"],
            transform="ipCountryCode",
        ),
        Feature(
            name="transaction_time",
            keys=["transactionID"],
            transform="localHour",  # Local time of the transaction
        ),
        Feature(
            name="is_proxy_ip",
            keys=["transactionID"],
            transform="isProxyIP",  # [nan, True, False]
        ),
        Feature(
            name="cvv_verify_result",
            keys=["transactionID"],
            transform="cvvVerifyResult",  # [nan, 'M', 'P', 'N', 'X', 'U', 'S', 'Y']
        ),
    ],
    keep_source_fields=True,
)

transaction_agg_feature_view = DerivedFeatureView(
    name="transaction_agg_feature_view",
    source=transaction_source,
    features=[
        Feature(
            name="avg_transaction_amount",
            keys=["accountID"],
            transform=OverWindowTransform(
                expr="transactionAmount",
                agg_func="AVG",
                window_size=timedelta(days=7),
                group_by_keys=["accountID"],
            ),
        ),
        # number of transaction that took place in a day
        Feature(
            name="num_transaction_count_in_day",
            keys=["accountID"],
            transform=OverWindowTransform(
                expr="transactionID",
                agg_func="COUNT",
                window_size=timedelta(days=1),
                group_by_keys=["accountID"],
            ),
        ),
        # number of transaction that took place in the past week
        Feature(
            name="num_transaction_count_in_week",
            keys=["accountID"],
            transform=OverWindowTransform(
                expr="transactionID",
                agg_func="COUNT",
                window_size=timedelta(days=7),
                group_by_keys=["accountID"],
            ),
        ),
        # amount of transaction that took place in a day
        Feature(
            name="total_transaction_amount_in_day",
            keys=["accountID"],
            transform=OverWindowTransform(
                expr="transactionAmount",
                agg_func="SUM",
                window_size=timedelta(days=1),
                group_by_keys=["accountID"],
            ),
        ),
        # average time of transaction in the past week
        Feature(
            name="avg_transaction_time_in_week",
            keys=["accountID"],
            transform=OverWindowTransform(
                expr="localHour",
                agg_func="AVG",
                window_size=timedelta(days=7),
                group_by_keys=["accountID"],
            ),
        ),
    ],
    keep_source_fields=True,
)

### Derive features from account and transaction features

In [None]:
online_feature_view = DerivedFeatureView(
    name="online_feature_view",
    source=transaction_agg_feature_view,
    features=[
        "account_feature_view_2.account_country_code",
        "account_feature_view_2.is_user_registered",
        "account_feature_view_2.num_payment_rejects_1d_per_user",
        "account_feature_view_2.account_age",
    ],
    keep_source_fields=True,
)

diff_feature_view = DerivedFeatureView(
    name="diff_feature_view",
    source=online_feature_view,
    features=[
        "transaction_feature_view.transaction_amount",
        Feature(
            name="diff_between_current_and_avg_amount",
            keys=["accountID"],
            transform="transaction_amount - avg_transaction_amount",
        ),
    ],
    keep_source_fields=True,
)

derived_feature_view = DerivedFeatureView(
    name="derived_feature_view",
    source=diff_feature_view,
    features=[
        "account_country_code",
        "is_user_registered",
        "num_payment_rejects_1d_per_user",
        "account_age",
        "avg_transaction_amount",
        "num_transaction_count_in_day",
        "num_transaction_count_in_week",
        "total_transaction_amount_in_day",
        "avg_transaction_time_in_week",
        "transaction_amount",
        "transaction_feature_view.transaction_country_code",
        "transaction_feature_view.transaction_time",
        "transaction_feature_view.is_proxy_ip",
        "transaction_feature_view.cvv_verify_result",
        "diff_between_current_and_avg_amount",
        "is_fraud",
        "timestamp",
    ],
    keep_source_fields=False,
)

## Build and materialize features

In [None]:
_ = client.build_features(
    [
        account_feature_view_2,
        transaction_feature_view,
        transaction_agg_feature_view,
        online_feature_view,
        derived_feature_view,
    ]
)

In [None]:
derived_df = client.get_features(derived_feature_view).to_pandas()

In [None]:
derived_df.sort_values("transactionID").head(5)

## Build a Fraud Detection Model

We use [Random Forest
Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html)
to build a fraud detection model.

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    PrecisionRecallDisplay,
)
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder

In [None]:
plottable_df = derived_df[
    [
        "account_country_code",
        "is_user_registered",
        "num_payment_rejects_1d_per_user",
        "account_age",
        "avg_transaction_amount",
        "num_transaction_count_in_day",
        "num_transaction_count_in_week",
        "total_transaction_amount_in_day",
        "avg_transaction_time_in_week",
        "transaction_amount",
        "transaction_country_code",
        "transaction_time",
        "is_proxy_ip",
        "cvv_verify_result",
        "diff_between_current_and_avg_amount",
        "is_fraud",
        "timestamp",
    ]
]

plottable_df

### Unserstand the dataset

In [None]:
plottable_df.describe().T

In [None]:
plottable_df.nunique()

In [None]:
# plot only sub-samples for simplicity
NUM_SAMPLES_TO_PLOT = 5000

fig = px.scatter_matrix(
    plottable_df.sample(n=NUM_SAMPLES_TO_PLOT, random_state=42),
    dimensions=plottable_df.columns[:-2],  # exclude the label and timestamp
    color="is_fraud",
    labels={
        col: col.replace("_", " ") for col in plottable_df.columns
    },  # remove underscore
)
fig.update_traces(
    diagonal_visible=False, showupperhalf=False, marker_size=3, marker_opacity=0.5
)
fig.update_layout(
    width=2000,
    height=2000,
    title={"text": "Scatter matrix for transaction dataset", "font_size": 20},
    font_size=6,
)
fig.show()

### Split training and validation sets

In [None]:
n_train = int(len(plottable_df) * 0.7)

train_df = plottable_df.iloc[:n_train]
test_df = plottable_df.iloc[n_train:]

print(
    f"""Training set:
{train_df["is_fraud"].value_counts()}

Validation set:
{test_df["is_fraud"].value_counts()}
"""
)

In [None]:
# Check the time range of the training and test set doesn't overlap
train_df["timestamp"].max(), test_df["timestamp"].min()

### Train and test a machine learning model

In [None]:
# Get labels as integers
y_train = train_df["is_fraud"].astype(int).to_numpy()
y_test = test_df["is_fraud"].astype(int).to_numpy()

In [None]:
# We convert categorical features into integer values by using one-hot-encoding and ordinal-encoding
categorical_feature_names = [
    "account_country_code",
    "transaction_country_code",
    "cvv_verify_result",
]
ordinal_feature_names = [
    "is_user_registered",
    "is_proxy_ip",
]

In [None]:
one_hot_encoder: OneHotEncoder = OneHotEncoder(sparse=False).fit(
    plottable_df[categorical_feature_names]
)
ordinal_encoder: OrdinalEncoder = OrdinalEncoder().fit(
    plottable_df[ordinal_feature_names]
)

In [None]:
ordinal_encoder.categories_

In [None]:
one_hot_encoder.categories_

In [None]:
X_train = np.concatenate(
    (
        one_hot_encoder.transform(train_df[categorical_feature_names]),
        ordinal_encoder.transform(train_df[ordinal_feature_names]),
        train_df.drop(
            categorical_feature_names
            + ordinal_feature_names
            + ["is_fraud", "timestamp"],
            axis="columns",
        )
        .fillna(0)
        .to_numpy(),
    ),
    axis=1,
)

X_test = np.concatenate(
    (
        one_hot_encoder.transform(test_df[categorical_feature_names]),
        ordinal_encoder.transform(test_df[ordinal_feature_names]),
        test_df.drop(
            categorical_feature_names
            + ordinal_feature_names
            + ["is_fraud", "timestamp"],
            axis="columns",
        )
        .fillna(0)
        .to_numpy(),
    ),
    axis=1,
)

In [None]:
clf = RandomForestClassifier(
    n_estimators=50,
    random_state=42,
).fit(X_train, y_train)

In [None]:
clf.score(X_test, y_test)

In [None]:
y_pred = clf.predict(X_test)
y_pred

In [None]:
y_prob = clf.predict_proba(X_test)
y_prob

In [None]:
display = PrecisionRecallDisplay.from_predictions(
    y_test, y_prob[:, 1], name="RandomForestClassifier"
)
_ = display.ax_.set_title("Fraud Detection Precision-Recall Curve")

In [None]:
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)

print(
    f"""Precision: {precision},
Recall: {recall},
F1: {f1}"""
)

In [None]:
confusion_matrix(y_test, y_pred)

### Feature importance

In [None]:
numeric_feature_names = [
    name
    for name in train_df.columns
    if name
    not in set(
        categorical_feature_names + ordinal_feature_names + ["is_fraud", "timestamp"]
    )
]
numeric_feature_names

In [None]:
# the order of features is [categorical features, ordinal features, numeric features]
importances = clf.feature_importances_[-len(numeric_feature_names) :]
std = np.std(
    [
        tree.feature_importances_[-len(numeric_feature_names) :]
        for tree in clf.estimators_
    ],
    axis=0,
)

fig = px.bar(
    pd.DataFrame(
        [numeric_feature_names, importances, std],
        index=["Numeric features", "importances", "std"],
    ).T,
    y="Numeric features",
    x="importances",
    error_x="std",
    orientation="h",
    title="Importance of the numeric features",
)
fig.update_layout(showlegend=False, width=1000)
fig.update_xaxes(title_text="Mean decrease in impurity", range=[0, 0.5])
fig.update_yaxes(title_text="Numeric features")
fig.show()

In [None]:
feature_names = categorical_feature_names + ordinal_feature_names
categories = one_hot_encoder.categories_ + ordinal_encoder.categories_

start_i = 0
n_rows = len(feature_names)

fig = make_subplots(
    rows=n_rows,
    cols=1,
    subplot_titles=[name.replace("_", " ") for name in feature_names],
    x_title="Mean decrease in impurity",
)

for i in range(n_rows):
    category = categories[i]
    end_i = start_i + len(category)

    fig.add_trace(
        go.Bar(
            x=clf.feature_importances_[start_i:end_i],
            y=category,
            width=0.2,
            error_x=dict(
                type="data",
                array=np.std(
                    [
                        tree.feature_importances_[start_i:end_i]
                        for tree in clf.estimators_
                    ],
                    axis=0,
                ),
            ),
            orientation="h",
        ),
        row=i + 1,
        col=1,
    )

    start_i = end_i

fig.update_layout(
    title="Importance of the categorical features",
    showlegend=False,
    width=1000,
    height=1000,
)
fig.update_xaxes(range=[0, 0.5])
fig.show()

## Materialize features in memory online store

In [None]:
from feathub.feature_tables.sinks.memory_store_sink import MemoryStoreSink
from feathub.feature_tables.sources.memory_store_source import MemoryStoreSource
from feathub.feature_views.on_demand_feature_view import OnDemandFeatureView

sink = MemoryStoreSink(table_name="table_name_1")

job = client.materialize_features(
    feature_descriptor=online_feature_view,
    sink=sink,
    allow_overwrite=True,
)
job.wait(timeout_ms=300000)

In [None]:
source = MemoryStoreSource(
    name="online_store_source",
    keys=["accountID"],
    table_name="table_name_1",
)
on_demand_feature_view = OnDemandFeatureView(
    name="on_demand_feature_view",
    features=[
        "online_store_source." + x
        for x in [
            "is_user_registered",
            "num_payment_rejects_1d_per_user",
            "account_age",
            "avg_transaction_amount",
            "num_transaction_count_in_day",
            "num_transaction_count_in_week",
            "total_transaction_amount_in_day",
            "avg_transaction_time_in_week",
        ]
    ],
    request_schema=Schema.new_builder().column("accountID", types.String).build(),
)
client.build_features([source, on_demand_feature_view])

request_df = pd.DataFrame(
    np.array([["A1055520452832600"]]),
    columns=["accountID"],
)
online_features = client.get_online_features(
    request_df=request_df,
    feature_view=on_demand_feature_view,
)

online_features