```
Copyright 2022 IBM Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```

# Graph Features Extraction for Anti-Money Laudering

The Snap ML GraphFeaturePreprocessor is a scikit-learn compatible preprocessor that enables scalable and real-time feature extraction from graph-structured data. It provides utilities for creating and updating in-memory graphs as well as extracting new features from these graphs. The goal of this example is to show how to use the API of this preprocessor. As input, we will use a synthethic dataset in tabular format where each row represents a financial transaction. For each transaction 4 features are available: transaction ID, source account ID, target accound ID and transaction timestamp. 

In [1]:
# Import the Graph Feature Preprocessor from Snap ML
from snapml import GraphFeaturePreprocessor

# # Import other libraries
import numpy as np
import json
import time

import pandas as pd
pd.options.display.max_columns = None

from sklearn.model_selection import train_test_split

In [2]:
mon_laun_path = "datasets/HI-Small_Trans.csv"
# mon_laun_df_init = pd.read_csv(mon_laun_path, nrows = 1000)
mon_laun_df_init = pd.read_csv(mon_laun_path)

mon_laun_df_init = mon_laun_df_init.sort_values(by = ["Timestamp"], ascending = True)

# display(mon_laun_df_init)

In [3]:
x1 = mon_laun_df_init['Receiving Currency'].unique()
x2 = mon_laun_df_init['Payment Currency'].unique()
x3 = mon_laun_df_init['Payment Format'].unique()

x4 = pd.unique(mon_laun_df_init[["From Bank", "To Bank"]].values.ravel('K'))
x5 = pd.unique(mon_laun_df_init[["Account", "Account.1"]].values.ravel('K'))
x6 = mon_laun_df_init.loc[mon_laun_df_init['Is Laundering'] == 1]

# print(x5)
# print(len(x5))

In [4]:
mon_laun_df_init["transactionID"] = mon_laun_df_init.index.astype(float)
mon_laun_df_init["sourceAccount"] = mon_laun_df_init["From Bank"].astype(str) + "_" + mon_laun_df_init["Account"]
mon_laun_df_init["targetAccount"] = mon_laun_df_init["To Bank"].astype(str) + "_" + mon_laun_df_init["Account.1"]

mon_laun_df_init["Timestamp"] = pd.to_datetime(mon_laun_df_init["Timestamp"], format = '%Y/%m/%d %H:%M')
# The output is in nanoseconds. Convert to seconds by deviding 10**9
mon_laun_df_init["Timestamp_float"] = mon_laun_df_init["Timestamp"].astype('int64') // 10 ** 9
# display(mon_laun_df_init)

x7 = pd.unique(mon_laun_df_init[["sourceAccount", "targetAccount"]].values.ravel('K'))
# print(x7)
# print(len(x7))

In [5]:
# Replacing source and target with unique id
unique_account_df = mon_laun_df_init[["sourceAccount", "targetAccount"]]
x = unique_account_df.stack()
x[:] = x.factorize()[0]
mon_laun_df_init = mon_laun_df_init.join(x.unstack().add_suffix('ID'))

# Replacing currency with unique id
unique_currency_df = mon_laun_df_init[["Receiving Currency", "Payment Currency"]]
x = unique_currency_df.stack()
x[:] = x.factorize()[0]
mon_laun_df_init = mon_laun_df_init.join(x.unstack().add_suffix('ID'))


# Replacing payment format with unique id
mon_laun_df_init['Payment FormatID'], mapping = mon_laun_df_init['Payment Format'].factorize()

x8 = pd.unique(mon_laun_df_init[["sourceAccountID", "targetAccountID"]].values.ravel('K'))
# print(x8)
# print(len(x8))

x9 = pd.unique(mon_laun_df_init[["Receiving CurrencyID", "Payment CurrencyID"]].values.ravel('K'))
# print(x9)
# print(len(x9))

x10 = pd.unique(mon_laun_df_init["Payment Format"].values.ravel('K'))
# print(x10)
# print(len(x10))

# display(mon_laun_df_init)

In [6]:
mon_laun_df = mon_laun_df_init.copy()

In [7]:
# mon_laun_df = mon_laun_df[mon_laun_df["sourceAccountID"] != mon_laun_df["targetAccountID"]]
# display(mon_laun_df)

In [8]:
mon_laun_df_labels = mon_laun_df["Is Laundering"]

x_train, x_test, y_train, y_test = train_test_split(mon_laun_df, mon_laun_df_labels, 
                                                    test_size = 0.2, 
                                                    random_state = 42, 
                                                    shuffle = False, 
                                                    stratify = None
                                                   )
# display(x_train)
# display(x_test)

In [9]:
non_float_columns = ['transactionID', 'Timestamp', \
                     "sourceAccount", "Account", \
                     "targetAccount", "Account.1", \
                     "Receiving Currency", "Payment Currency", "Payment Format"
                    ]

correct_columns_order = ['transactionID', 'sourceAccountID', 'targetAccountID', \
                         'Timestamp_float', \
                         "Is Laundering", \
                         "From Bank", \
                         "To Bank", \
                         "Amount Received", "Receiving CurrencyID", \
                         "Amount Paid", "Payment CurrencyID", \
                         "Payment FormatID"
                        ]
columns_for_vertex_stats = [3, 7, 8, 9, 10, 11]

x_train_meta = x_train[non_float_columns]
x_train = x_train[correct_columns_order]

x_test_meta = x_test[non_float_columns]
x_test = x_test[correct_columns_order]

# display(x_train)

In [10]:
x_train = pd.DataFrame(x_train)
print(x_train.dtypes)
print(x_train.shape)
x_train = np.ascontiguousarray(x_train)

transactionID           float64
sourceAccountID          object
targetAccountID          object
Timestamp_float           int64
Is Laundering             int64
From Bank                 int64
To Bank                   int64
Amount Received         float64
Receiving CurrencyID     object
Amount Paid             float64
Payment CurrencyID       object
Payment FormatID          int64
dtype: object
(4062676, 12)


In [11]:
x_test = pd.DataFrame(x_test)
print(x_test.dtypes)
print(x_test.shape)
x_test = np.ascontiguousarray(x_test)

transactionID           float64
sourceAccountID          object
targetAccountID          object
Timestamp_float           int64
Is Laundering             int64
From Bank                 int64
To Bank                   int64
Amount Received         float64
Receiving CurrencyID     object
Amount Paid             float64
Payment CurrencyID       object
Payment FormatID          int64
dtype: object
(1015669, 12)


In [12]:
# The following dictionary defines the configuration parameters of the Graph Feature Preprocessor

# transaction (edgeID), source-destination (vertex), Timestamp --> first 4 columns
    # other raw features, and new graph-based and vertex features

# timestamp --> is also needed for sorting input data in increasing order
# unit of time is the same as the one used for timestamps of edges -- seconds

# batch size of 128 -- ??

#################################################
hw = 5  # histogram size
hw_bin_range = 5 # histogram bin size

# Array used for specifying the bins of the pattern histogram
histogram_range_init = [y * hw_bin_range + hw_bin_range + 1 for y in range(-1, hw)]
print(histogram_range_init)
      
vertex_stats_feats_types = [i for i in range(11)]

# PAPER: not specified values --> vertex_stats_tw, time_window, max_no_edges
params = {
    "num_threads": 12,            # number of software threads to be used (important for performance)
    "vertex_stats": True,         # produce vertex statistics

    "vertex_stats_tw": 12 * 60 * 60,        # time window to consider
    "time_window": 12 * 60 * 60,            # time window used if no pattern was specified
                                            # transactions in largest time - time window
    
    "max_no_edges": 10,           # limit number of edges in detecting simple cycles
                                  # -1 means it is defined only using the time window
    
    "vertex_stats_cols": columns_for_vertex_stats,     
    # produce vertex statistics using the selected input columns
    # NOTE: Columns of the input numpy array used for generating vertex statistics features
    # NOTE: money amount, currency, etc. -- of source and target
    
    # features: 0:fan,1:deg,2:ratio,3:avg,4:sum,5:min,6:max,7:median,8:var,9:skew,10:kurtosis
    "vertex_stats_feats": vertex_stats_feats_types, # PAPER: computed based on Amount and Timestamp??
    
    # fan in/out parameters -- PAPER: NOT USED
    "fan": True,
    "fan_tw": 12 * 60 * 60, # seconds to hours (12 or 24)
    "fan_bins": histogram_range_init,
    
    # in/out degree parameters -- PAPER: NOT USED
    "degree": True,
    "degree_tw": 12 * 60 * 60, # seconds to hours (12 or 24)
    "degree_bins": histogram_range_init,
    
    # scatter gather parameters
    "scatter-gather": True,
    "scatter-gather_tw": 6 * 60 * 60, # seconds to hours
    "scatter-gather_bins": histogram_range_init,

    # # gather scatter parameters -- UNSUPPORTED
    # "gather-scatter": True,
    # "gather-scatter_tw": 6 * 60 * 60, # seconds to hours
    # "gather-scatter_bins": histogram_range_init,
    
    # temporal cycle parameters
    "temp-cycle": True,
    "temp-cycle_tw": 12 * 60 * 60, # seconds to hours (12 or 24)
    "temp-cycle_bins": histogram_range_init,
    
    # length-constrained simple cycle parameters
    "lc-cycle": True,
    "lc-cycle_tw": 12 * 60 * 60, # seconds to hours (12 or 24)
    "lc-cycle_bins": histogram_range_init,
    "lc-cycle_len": 10
}

[1, 6, 11, 16, 21, 26]


In [13]:
# Create a Graph Feature Preprocessor, set its configuration using the above dictionary and verify it

print("Creating a graph feature preprocessor ")
gp = GraphFeaturePreprocessor()

print("Setting the parameters of the graph feature preprocessor ")
gp.set_params(params)

print("Graph feature preprocessor parameters: ", json.dumps(gp.get_params(), indent = 4))

Creating a graph feature preprocessor 
Setting the parameters of the graph feature preprocessor 
Graph feature preprocessor parameters:  {
    "num_threads": 12,
    "time_window": 43200,
    "max_no_edges": 10,
    "vertex_stats": true,
    "vertex_stats_tw": 43200,
    "vertex_stats_cols": [
        3,
        7,
        8,
        9,
        10,
        11
    ],
    "vertex_stats_feats": [
        0,
        1,
        2,
        3,
        4,
        5,
        6,
        7,
        8,
        9,
        10
    ],
    "fan": true,
    "fan_tw": 43200,
    "fan_bins": [
        1,
        6,
        11,
        16,
        21,
        26
    ],
    "degree": true,
    "degree_tw": 43200,
    "degree_bins": [
        1,
        6,
        11,
        16,
        21,
        26
    ],
    "scatter-gather": true,
    "scatter-gather_tw": 21600,
    "scatter-gather_bins": [
        1,
        6,
        11,
        16,
        21,
        26
    ],
    "temp-cycle": true,
    "temp-cyc

In [14]:
print("Enriching the transactions with new graph features ")
# the fit_transform and transform functions are equivalent
# these functions can run on single transactions or on batches of transactions

start = time.time()
x_train_to_graph = gp.transform(x_train)
end = time.time()
print("train wait 1: " + str(end - start) + " sec")

print(x_train_to_graph.shape)

Enriching the transactions with new graph features 
train wait 1: 1103.5973081588745 sec
(4062676, 258)


In [15]:
start = time.time()
gp.fit(x_train)
# gp.partial_fit(x_train) # updating the graph
end = time.time()
print("test wait 1: " + str(end - start) + " sec")

start = time.time()
x_test_to_graph = gp.transform(x_test)
# x_test_to_graph = gp.fit_transform(x_test) # fit + transform 
end = time.time()
print("test wait 2: " + str(end - start) + " sec")

print(x_test_to_graph.shape)

test wait 1: 24.258920907974243 sec
test wait 2: 178.39623188972473 sec
(1015669, 258)


We define a helper function to inspect the newly generated graph-based features for a given transaction:

In [16]:
def print_enriched_transaction(transaction, params):
    # add raw features names
    colnames = correct_columns_order.copy()
    
    # add features names for the graph patterns
    for pattern in ['fan', 'degree', 'scatter-gather', 'temp-cycle', 'lc-cycle']:
        if pattern in params:
            if params[pattern]:
                bins = len(params[pattern + '_bins'])
                if pattern in ['fan', 'degree']:
                    for i in range(bins - 1):
                        colnames.append(pattern + "_in_bins_" + str(params[pattern + '_bins'][i]) + 
                                        "-" + str(params[pattern + '_bins'][i + 1]))
                    colnames.append(pattern + "_in_bins_" + str(params[pattern +'_bins'][i + 1]) + "-inf")

                    for i in range(bins - 1):
                        colnames.append(pattern + "_out_bins_" + str(params[pattern + '_bins'][i]) + 
                                        "-" + str(params[pattern + '_bins'][i + 1]))
                    colnames.append(pattern + "_out_bins_" + str(params[pattern + '_bins'][i + 1]) + "-inf")
                else:
                    for i in range(bins - 1):
                        colnames.append(pattern + "_bins_" + str(params[pattern + '_bins'][i]) + 
                                        "-" + str(params[pattern + '_bins'][i + 1]))
                    colnames.append(pattern + "_bins_" + str(params[pattern + '_bins'][i + 1]) + "-inf")

    vert_feat_names = ["fan","deg","ratio","avg","sum","min","max","median","var","skew","kurtosis"]

    # add features names for the vertex statistics
    for orig in ['source', 'dest']:
        for direction in ['out', 'in']:
            # add fan, deg, and ratio features
            for k in [0, 1, 2]:
                if k in params["vertex_stats_feats"]:
                    feat_name = orig + "_" + vert_feat_names[k] + "_" + direction
                    colnames.append(feat_name)
            for col in params["vertex_stats_cols"]:
                # add avg, sum, min, max, median, var, skew, and kurtosis features
                for k in [3, 4, 5, 6, 7, 8, 9, 10]:
                    if k in params["vertex_stats_feats"]:
                        feat_name = orig + "_" + vert_feat_names[k] + "_col" + str(col) + "_" + direction
                        colnames.append(feat_name)

    df = pd.DataFrame(transaction, columns = colnames)
    return df

In [17]:
print("Enriched transactions: ")
df_train_enriched = print_enriched_transaction(x_train_to_graph, gp.get_params())
df_test_enriched = print_enriched_transaction(x_test_to_graph, gp.get_params())

print(df_train_enriched.shape)
print(df_test_enriched.shape)

# display(df_train_enriched)

Enriched transactions: 
(4062676, 258)
(1015669, 258)


This newly enriched set of transactions can now be used to train a ML model. Once trained, the model can be used for prediction (e.g., detect anomalies) on new (unlabeled) transactions. The main steps associated with this use case is shown below:

In [18]:
df_xgboost_train_data = pd.merge(x_train_meta, df_train_enriched, on = 'transactionID')
df_xgboost_test_data = pd.merge(x_test_meta, df_test_enriched, on = 'transactionID')

output_column_order = ['transactionID', 'Is Laundering', \
                       'Timestamp', 'Timestamp_float', \
                       'sourceAccountID', 'sourceAccount', 'From Bank', 'Account', \
                       'targetAccountID', 'targetAccount', 'To Bank', 'Account.1', \
                       'Amount Received', 'Receiving Currency', 'Receiving CurrencyID', \
                       'Amount Paid', 'Payment Currency', 'Payment CurrencyID', \
                       'Payment Format', 'Payment FormatID'
                        ]
for col in df_xgboost_train_data.columns[len(output_column_order):]:
    output_column_order.append(col)
    
df_xgboost_train_data = df_xgboost_train_data[output_column_order]
df_xgboost_test_data = df_xgboost_test_data[output_column_order]

print(df_xgboost_train_data.shape)
print(df_xgboost_test_data.shape)

# display(df_xgboost_train_data)

(4062676, 266)
(1015669, 266)


In [19]:
df_xgboost_train_data.to_csv('datasets/merged_dataset_train.csv', index = False)
df_xgboost_test_data.to_csv('datasets/merged_dataset_test.csv', index = False)

Now the enriched transactions can be used as input to the ML model previously trained. 