In [2]:
! pip install proton-driver pycaret

Collecting proton-driver
  Downloading proton_driver-0.2.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting tzlocal (from proton-driver)
  Downloading tzlocal-5.2-py3-none-any.whl.metadata (7.8 kB)
Downloading proton_driver-0.2.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (990 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m990.3/990.3 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hDownloading tzlocal-5.2-py3-none-any.whl (17 kB)
Installing collected packages: tzlocal, proton-driver
Successfully installed proton-driver-0.2.10 tzlocal-5.2


### Connect to Proton

connect to proton and check how many records are there in the `online_payment` stream

In [8]:
from proton_driver import client

c = client.Client(host='proton', port=8463)

# query the stream and return in a iterator
rows = c.execute_iter(
    "SELECT count(*) "
    "FROM table(online_payments)",
)
for row in rows:
    print(row)

(8134,)


### Build Features

#### Real-time features

Real-time features are those attirbutes that can be transformed in real-time 

In the following SQL, four real-time features are created

* type - transaction type, a categorical feature
* amount - transaction amount, a numerical feature
* previous_amount -  amount of the current account's previous transaction, a numberical feature
* time_to_last_transaction - time elapsed from the current account's previous transaction

In [11]:
sql_realtime_feature_view = '''CREATE VIEW IF NOT EXISTS v_fraud_reatime_features AS
WITH cte AS
  (
    SELECT 
      _tp_time, 
      id, 
      type, 
      account_from, 
      amount, 
      lag(amount) AS previous_amount, 
      lag(_tp_time) AS previous_transaction_time
    FROM 
      default.online_payments
    WHERE 
      _tp_time > earliest_timestamp()
    PARTITION BY 
      account_from
  )
SELECT 
  _tp_time, 
  id, 
  type, 
  account_from, 
  amount, 
  previous_amount, 
  previous_transaction_time, 
  if(previous_transaction_time > earliest_timestamp(), date_diff('second', previous_transaction_time, _tp_time), 0) AS time_to_last_transaction
FROM 
  cte
'''

c.execute(sql_realtime_feature_view)

[]

#### Near real-time feature

Near real-time feature are those features caculated based a short time window

In the following two SQL, we create some near real-time features using tumble window

* 1m transaction count per account
* 1m transaction max amount per account
* 1m transaction min amount per account
* 1m transaction average amount per account
* 5m distinct transaction target account number per account

In [14]:
sql_1m_feature = '''CREATE VIEW IF NOT EXISTS v_fraud_1m_features AS
SELECT 
  window_start, 
  account_from, 
  count(*) AS count, 
  max(amount) AS max_amount, 
  min(amount) AS min_amount, 
  avg(amount) AS avg_amount
FROM 
  tumble(default.online_payments, 60s)
WHERE 
  _tp_time > earliest_timestamp()
GROUP BY 
  window_start, account_from
'''

sql_5m_feature = '''CREATE VIEW IF NOT EXISTS v_fraud_5m_features AS
SELECT 
  window_start, 
  account_from, 
  count_distinct(account_to) AS target_counts
FROM 
  tumble(default.online_payments, 5m)
WHERE 
  _tp_time > earliest_timestamp()
GROUP BY 
  window_start, account_from
'''

c.execute(sql_1m_feature)
c.execute(sql_5m_feature)

[]

#### Historical features

Historical features are feature build on top of historical data

Following SQL create a feature view using 1 day tumble window and then do a global aggregation to get the daily aggregation from all hitorical data

* daily average transaction amount per account
* daily average max transaction amount per account

In [18]:
sql_1d_global_feature = '''CREATE VIEW  IF NOT EXISTS v_fraud_1d_features AS
WITH agg1d AS
  (
    SELECT 
      window_start, account_from, count(*) AS count, max(amount) AS max_amount
    FROM 
      tumble(default.online_payments, 1d)
    WHERE 
      _tp_time > earliest_timestamp()
    GROUP BY 
      window_start, account_from
  )
SELECT 
  now64() as ts, account_from, avg(count) AS avg_count, avg(max_amount) AS avg_max_amount
FROM 
  agg1d
GROUP BY 
  account_from
'''

c.execute(sql_1d_global_feature)

[]

#### Joining all features

leverage asof join, create a feature materialized view for each transaction record

the materialized view will be continuously running in the background, output features as soon as new data comming in


In [19]:
sql_all_features = '''
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_fraud_all_features AS
SELECT 
  _tp_time AS time, 
  v_fraud_reatime_features.id AS id, 
  v_fraud_reatime_features.type AS type, 
  v_fraud_reatime_features.account_from AS account, 
  v_fraud_reatime_features.amount AS amount, 
  v_fraud_reatime_features.previous_amount AS previous_amount, 
  v_fraud_reatime_features.time_to_last_transaction AS time_to_last_transaction, 
  v_fraud_1m_features.count AS transaction_count_1m, 
  v_fraud_1m_features.max_amount AS max_transaction_amount_1m, 
  v_fraud_1m_features.avg_amount AS avg_transaction_amount_1m, 
  v_fraud_5m_features.target_counts AS distinct_transaction_target_count_5m, 
  v_fraud_1d_features.avg_count AS avg_transaction_count_1d, 
  v_fraud_1d_features.avg_max_amount AS avg_max_transaction_count_1d
FROM 
  v_fraud_reatime_features
ASOF LEFT JOIN v_fraud_1m_features ON (v_fraud_reatime_features.account_from = v_fraud_1m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1m_features.window_start)
ASOF LEFT JOIN v_fraud_5m_features ON (v_fraud_reatime_features.account_from = v_fraud_5m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_5m_features.window_start)
ASOF LEFT JOIN v_fraud_1d_features ON (v_fraud_reatime_features.account_from = v_fraud_1d_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1d_features.ts)
SETTINGS 
  keep_versions = 100
'''

c.execute(sql_all_features)

[]

### Training

#### Prepare training dataset by joining label data with features

following SQL will joining all features with label that contains which transaction is fraud, the query result will be used as the training dataset

In the following sample, we run a proton query and turn the query result into a pandas dataframe

In [61]:
import pandas as pd

sql_feature_with_lable = '''
SELECT 
  *
FROM 
  table(mv_fraud_all_features) as f
LEFT JOIN table(online_payments_label) as l ON f.id = l.id
LIMIT 100000
'''

rows = c.execute_iter(sql_feature_with_lable, with_column_types=True)
header = next(rows)
query_result = []
for r in rows:
    query_result.append(r)
    
columns = [ f[0] for f in header]
df = pd.DataFrame(query_result, columns=columns)
df_train = df[['type', 'amount', 'previous_amount', 
'time_to_last_transaction', 'transaction_count_1m', 'max_transaction_amount_1m',
'avg_transaction_amount_1m','distinct_transaction_target_count_5m','avg_transaction_count_1d',
'avg_max_transaction_count_1d','is_fraud']]

df_train.head(5)

Unnamed: 0,type,amount,previous_amount,time_to_last_transaction,transaction_count_1m,max_transaction_amount_1m,avg_transaction_amount_1m,distinct_transaction_target_count_5m,avg_transaction_count_1d,avg_max_transaction_count_1d,is_fraud
0,TRANSFER,2.75,46.09,1360,1,46.09,46.09,2,0.0,0.0,False
1,CASH_OUT,160.84,0.0,0,0,0.0,0.0,0,0.0,0.0,False
2,TRANSFER,875.51,0.0,0,0,0.0,0.0,0,0.0,0.0,False
3,PAYMENT,2586.83,0.0,0,0,0.0,0.0,0,0.0,0.0,False
4,CASH_OUT,792.87,0.0,0,0,0.0,0.0,0,0.0,0.0,False


#### setup training enironment using pycaret

PyCaret is an open-source, low-code machine learning library in Python that automates machine learning workflows

In the following code, we use pycaret to train a classiciation model used for predict if the transaction is fraud or not

In [62]:
from pycaret.classification import *
s = setup(data = df_train, target = 'is_fraud', session_id = 123)

Unnamed: 0,Description,Value
0,Session id,123
1,Target,is_fraud
2,Target type,Binary
3,Original data shape,"(40217, 11)"
4,Transformed data shape,"(40217, 13)"
5,Transformed train set shape,"(28151, 13)"
6,Transformed test set shape,"(12066, 13)"
7,Numeric features,9
8,Categorical features,1
9,Preprocess,True


In [64]:
best_model = compare_models()

Unnamed: 0,Model,Accuracy,AUC,Recall,Prec.,F1,Kappa,MCC,TT (Sec)
lightgbm,Light Gradient Boosting Machine,0.9925,0.9954,0.9033,0.9258,0.9141,0.9102,0.9104,1.267
gbc,Gradient Boosting Classifier,0.9917,0.9951,0.8777,0.9304,0.9032,0.8988,0.8993,0.606
rf,Random Forest Classifier,0.9903,0.9745,0.8889,0.8928,0.8906,0.8856,0.8857,0.36
et,Extra Trees Classifier,0.9899,0.9608,0.8825,0.8903,0.886,0.8808,0.881,0.243
dt,Decision Tree Classifier,0.9892,0.9387,0.8825,0.8759,0.8789,0.8732,0.8734,0.04
ada,Ada Boost Classifier,0.9879,0.9935,0.8105,0.9062,0.8553,0.849,0.8507,0.205
knn,K Neighbors Classifier,0.9871,0.9569,0.8265,0.8775,0.8509,0.8442,0.8448,0.072
lr,Logistic Regression,0.9795,0.8637,0.5619,0.9621,0.7091,0.6993,0.7267,0.34
ridge,Ridge Classifier,0.9793,0.0,0.5539,0.9636,0.7033,0.6934,0.7222,0.029
lda,Linear Discriminant Analysis,0.9784,0.7301,0.5603,0.9223,0.6969,0.6864,0.7095,0.041


In [65]:
best_model

In [66]:
save_model(best_model, 'fraud_model')

Transformation Pipeline and Model Successfully Saved


(Pipeline(memory=Memory(location=None),
          steps=[('numerical_imputer',
                  TransformerWrapper(exclude=None,
                                     include=['amount', 'previous_amount',
                                              'time_to_last_transaction',
                                              'transaction_count_1m',
                                              'max_transaction_amount_1m',
                                              'avg_transaction_amount_1m',
                                              'distinct_transaction_target_count_5m',
                                              'avg_transaction_count_1d',
                                              'avg_max_transaction_count_1d'],
                                     transformer=...
                  LGBMClassifier(boosting_type='gbdt', class_weight=None,
                                 colsample_bytree=1.0, importance_type='split',
                                 learning_rate=0.1, max

### Prediction

we can run a real-time query from proton, per each coming record, we can run that prediction to check if it is a fraud or not

#### load model

In [67]:
from pycaret.classification import load_model
model = load_model('fraud_model')

Transformation Pipeline and Model Successfully Loaded


#### real-time inference

by running the Proton streaming query and send those feature to the trained model, we can predict if a transaction is fraud or not in real-time

you can change the number from the `LIMIT` or just remove the `LIMIT`, if there is no limit set, the query will be running forever unit user cancel it

In [78]:
from pycaret.classification import predict_model
query = '''
SELECT 
  *
FROM 
  mv_fraud_all_features
LIMIT 3
'''

rows = c.execute_iter(query, with_column_types=True)
header = next(rows)
columns = [ f[0] for f in header]

for r in rows:
    df = pd.DataFrame([r], columns=columns)
    df_infer = df[['id', 'type', 'amount', 'previous_amount', 
'time_to_last_transaction', 'transaction_count_1m', 'max_transaction_amount_1m',
'avg_transaction_amount_1m','distinct_transaction_target_count_5m','avg_transaction_count_1d',
'avg_max_transaction_count_1d']]
    prediction = predict_model(model, data = df_infer)
    id = prediction['id'].tolist()[0]
    prediction_lable = prediction['prediction_label'].tolist()[0]
    is_fraud = 'fraud' if prediction_lable == 1 else 'not fraud'
    print(f"transaction {id} is {is_fraud}")
    


transaction 23a861db-aabd-424f-943e-d7748ea465a7 is not fraud


transaction 4cc484f4-1668-40c7-a23c-604e871684ab is not fraud


transaction e10e3090-3ffe-4a14-9085-7ceef933d8ff is not fraud
