<h1>Real-Time Fraud Detection in the Databricks Lakehouse with Tecton


This notebook shows an example workflow to create new features for offline training and online serving to support a real-time fraud detection ML model. Here is an outline of the steps in this notebook: 

1. Create windowed aggregation features on a streaming data source
2. Build an on-demand feature which will perform a real-time feature calculation at request time 
3. Use Tecton to enrich labeled training events with our newly defined feature columns
4. Train an ML model with these new features
5. Apply new feature definitions to Tecton for online serving
6. Use Tecton online feature serving and Databricks Model Serving to perform fraud detection in real-time on an example transaction

In [0]:
import tecton
import pandas as pd
import requests, json

## Build a new streaming feature against a registered Stream Source

This notebook code runs on a Databricks cluster which has been configured with Tecton credentials. Here are instructions for [configuring a Databricks cluster to communicate with Tecton](https://docs.tecton.ai/docs/setting-up-tecton/connecting-to-a-data-platform/tecton-on-databricks/connecting-databricks-notebooks).

In [0]:
ws = tecton.get_workspace('demo-ext')

In [0]:
from tecton import stream_feature_view, Aggregation, StreamProcessingMode, FilteredSource
from datetime import datetime, timedelta

transactions = ws.get_data_source('transactions')
user = ws.get_entity('user')

@stream_feature_view(
    source=FilteredSource(transactions),
    entities=[user],
    mode='spark_sql',
    stream_processing_mode=StreamProcessingMode.CONTINUOUS,
    aggregations=[
        Aggregation(function='mean', column='amt', time_window=timedelta(minutes=10)),
        Aggregation(function='mean', column='amt', time_window=timedelta(hours=1)),
        Aggregation(function='mean', column='amt', time_window=timedelta(days=3)),
    ]
)
def user_transaction_amount_averages(transactions):
    return f'''
        SELECT user_id, timestamp, amt
        FROM {transactions}
        ''' 

user_transaction_amount_averages.validate()

StreamFeatureView 'user_transaction_amount_averages': Validating 1 of 3 dependencies. (2 already validated)
    Transformation 'user_transaction_amount_averages': Successfully validated.
StreamFeatureView 'user_transaction_amount_averages': Successfully validated.


## Test the stream feature transformation against historical stream data

In [0]:
start = datetime(2022,1,1)
end = datetime(2022,1,14)

df = user_transaction_amount_averages.get_historical_features(start_time=start, end_time=end).to_pandas()

display(df)

user_id,timestamp,amt_mean_10m_continuous,amt_mean_1h_continuous,amt_mean_3d_continuous,_effective_timestamp
user_131340471060,2022-01-01T20:31:47.000+0000,144.45,144.45,110.41333333333334,2022-01-01T20:31:47.000+0000
user_131340471060,2022-01-02T07:28:50.000+0000,4.14,4.14,83.845,2022-01-02T07:28:50.000+0000
user_131340471060,2022-01-03T16:28:51.000+0000,15.1,15.1,54.563333333333325,2022-01-03T16:28:51.000+0000
user_131340471060,2022-01-06T03:06:12.000+0000,91.34,91.34,53.22,2022-01-06T03:06:12.000+0000
user_131340471060,2022-01-07T20:33:39.000+0000,5.66,5.66,48.5,2022-01-07T20:33:39.000+0000
user_131340471060,2022-01-08T11:18:01.000+0000,5.78,5.78,34.26,2022-01-08T11:18:01.000+0000
user_205125746682,2022-01-04T00:55:13.000+0000,46.02,46.02,46.02,2022-01-04T00:55:13.000+0000
user_205125746682,2022-01-04T10:17:03.000+0000,108.46,108.46,77.24,2022-01-04T10:17:03.000+0000
user_205125746682,2022-01-05T08:10:59.000+0000,74.75,74.75,76.41,2022-01-05T08:10:59.000+0000
user_205125746682,2022-01-05T18:11:29.000+0000,31.44,31.44,65.1675,2022-01-05T18:11:29.000+0000


## Build a new real-time feature that leverages request data

In [0]:
from tecton import RequestSource, on_demand_feature_view
from tecton.types import Field, Float64, Int64

transaction_request = RequestSource(schema=[Field('amt', Float64)])

@on_demand_feature_view(
    sources=[transaction_request, user_transaction_amount_averages],
    mode='python',
    schema=[Field('transaction_amount_is_higher_than_average', Int64)]
)
def transaction_amount_is_higher_than_average(transaction_request, user_transaction_amount_averages):
    amount_mean = 0 if user_transaction_amount_averages['amt_mean_3d_continuous'] is None else user_transaction_amount_averages['amt_mean_3d_continuous']
    return {'transaction_amount_is_higher_than_average': int(transaction_request['amt'] > amount_mean)}
  
transaction_amount_is_higher_than_average.validate()

OnDemandFeatureView 'transaction_amount_is_higher_than_average': Validating 1 of 2 dependencies. (1 already validated)
    Transformation 'transaction_amount_is_higher_than_average': Successfully validated.
OnDemandFeatureView 'transaction_amount_is_higher_than_average': Successfully validated.


## Validate new feature locally with mock data

In [0]:
transaction_request_mock_data = pd.DataFrame([{"amt": 100.0}])
user_transaction_amount_averages_mock_data = pd.DataFrame([{
    "amt_mean_10m_continuous": 20.0,
    "amt_mean_1h_continuous": 55.0,
    "amt_mean_3d_continuous": 200.0
}])

result = transaction_amount_is_higher_than_average.run(
    transaction_request=transaction_request_mock_data,
    user_transaction_amount_averages=user_transaction_amount_averages_mock_data,
)
print(result)

{'transaction_amount_is_higher_than_average': 0}


## Extend the feature set with our new features

In [0]:
features_list = ws.get_feature_service('fraud_detection_feature_service').features

In [0]:
from tecton import FeatureService

fraud_detection_feature_service_v2 = FeatureService(
    name="fraud_detection_feature_service:v2",
    features=features_list+[transaction_amount_is_higher_than_average, user_transaction_amount_averages]
)
fraud_detection_feature_service_v2.validate()

FeatureService 'fraud_detection_feature_service:v2': Successfully validated.


## Compute features for historical training events

In [0]:
training_events = spark.read.parquet("s3://tecton.ai.public/tutorials/fraud_demo/transactions/").select("user_id", "timestamp", "amt", "is_fraud").limit(1000)
display(training_events)

user_id,timestamp,amt,is_fraud
user_884240387242,2023-06-20T10:26:41.000+0000,68.23,0
user_268514844966,2023-06-20T12:57:20.000+0000,32.98,0
user_722584453020,2023-06-20T14:49:59.000+0000,4.5,0
user_337750317412,2023-06-20T14:50:13.000+0000,7.68,0
user_934384811883,2023-06-20T15:55:09.000+0000,68.97,1
user_402539845901,2023-06-20T16:57:43.000+0000,85.2,0
user_699668125818,2023-06-20T18:18:10.000+0000,133.29,0
user_574612776685,2023-06-20T18:30:02.000+0000,142.71,0
user_600003278485,2023-06-20T21:58:48.000+0000,9.29,0
user_222506789984,2023-06-21T00:24:08.000+0000,2.44,1


In [0]:
training_data = fraud_detection_feature_service_v2.get_historical_features(training_events).to_pandas()
display(training_data)

user_id,timestamp,amt,is_fraud,user_credit_card_issuer__credit_card_issuer,user_last_transaction_amount__amt,user_transaction_amount_averages__amt_mean_10m_continuous,user_transaction_amount_averages__amt_mean_1h_continuous,user_transaction_amount_averages__amt_mean_3d_continuous,transaction_amount_is_higher_than_average__transaction_amount_is_higher_than_average
user_131340471060,2023-06-21T13:57:50.000+0000,73.8,0,Visa,73.8,73.8,73.8,102.39333333333332,0
user_131340471060,2023-06-27T00:52:08.000+0000,46.48,0,Visa,46.48,46.48,46.48,46.48,0
user_131340471060,2023-07-05T10:42:35.000+0000,157.82,0,Visa,157.82,157.82,157.82,157.82,0
user_131340471060,2023-07-11T02:06:55.000+0000,3.53,0,Visa,3.53,3.53,3.53,3.53,0
user_131340471060,2023-07-16T01:04:41.000+0000,6.42,1,Visa,6.42,6.42,6.42,6.42,0
user_131340471060,2023-07-21T14:58:13.000+0000,48.58,0,Visa,48.58,48.58,48.58,48.58,0
user_131340471060,2023-07-22T00:22:34.000+0000,9.26,1,Visa,9.26,9.26,9.26,28.92,0
user_131340471060,2023-07-24T18:28:54.000+0000,65.62,0,Visa,65.62,65.62,65.62,37.440000000000005,1
user_131340471060,2023-07-26T13:22:38.000+0000,53.26,0,Visa,53.26,53.26,53.26,59.44,0
user_131340471060,2023-07-29T10:39:56.000+0000,186.87,0,Visa,186.87,186.87,186.87,120.065,1


## Train a model and register it with MLflow 

In [0]:
from sklearn.pipeline import make_pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import LogisticRegression 
from sklearn.model_selection import train_test_split 
from sklearn import metrics

df = training_data.drop(['user_id', 'timestamp', 'amt'], axis=1)

X = df.drop('is_fraud', axis=1)
y = df['is_fraud']

feature_names = X.columns.tolist()
    
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

num_cols = X_train.select_dtypes(exclude=['object']).columns.tolist()
cat_cols = X_train.select_dtypes(include=['object']).columns.tolist()

num_pipe = make_pipeline(
    SimpleImputer(strategy='median'),
    StandardScaler()
)

cat_pipe = make_pipeline(
    SimpleImputer(strategy='constant', fill_value='N/A'),
    OneHotEncoder(handle_unknown='ignore', sparse=False)
)

full_pipe = ColumnTransformer([
    ('num', num_pipe, num_cols),
    ('cat', cat_pipe, cat_cols)
])

model = make_pipeline(full_pipe, LogisticRegression(max_iter=1000, random_state=42))

model.fit(X_train,y_train)

y_predict = model.predict(X_test)

print(metrics.classification_report(y_test, y_predict, zero_division=0))

  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

           0       0.88      1.00      0.94       265
           1       0.00      0.00      0.00        35

    accuracy                           0.88       300
   macro avg       0.44      0.50      0.47       300
weighted avg       0.78      0.88      0.83       300



In [0]:
import mlflow
import mlflow.sklearn

with mlflow.start_run() as run:
    mlflow.sklearn.log_model(model, "fraud-detection")

## Apply your Tecton application to production

Once ready to productionize these features into a live feature pipeline and provide online serving, you would use the Tecton CLI to login to your Tecton account. After updating your feature repository code, you would run "tecton apply" to submit the changes to production.

```bash
tecton login app.tecton.ai
tecton workspace select tecton-demo-ext
tecton apply
```

## Create a function to retrieve features from Tecton's REST API

In [0]:
def get_online_feature_data(user_id, amt):
    headers = {"Authorization": "Tecton-key " + dbutils.secrets.get(scope="tecton-demo", key="tecton-api-key")}

    request_data = f'''{{
        "params": {{
            "feature_service_name": "fraud_detection_feature_service:v2",
            "join_key_map": {{"user_id": "{user_id}"}},
            "metadata_options": {{"include_names": true}},
            "request_context_map": {{"amt": {amt}}},
            "workspace_name": "tecton-demo-ext-clone"
        }}
    }}'''

    online_feature_data = requests.request(
        method="POST",
        headers=headers,
        url="https://app.tecton.ai/api/v1/feature-service/get-features",
        data=request_data,
    )

    online_feature_data_json = json.loads(online_feature_data.text)

    return online_feature_data_json

In [0]:
get_online_feature_data('user_502567604689', 72.46)['result']

Out[25]: {'features': ['1',
  'Visa',
  78.41,
  52.339999999999996,
  47.85903225806451,
  60.7954170854272]}

## Create a function to retrieve a model prediction from MLflow serving
## using online feature data from Tecton

The code below will send a feature vector from Tecton to a Databricks model serving endpoint

In [0]:
def get_prediction_from_model(feature_data):
    # Format data for model API
    feature_data_builder = {}
    feature_data_builder["index"] = [0]
    feature_data_builder["data"] = []
    feature_data_builder["data"].append(feature_data["result"]["features"])
    feature_column_names = feature_data["metadata"]["features"]
    feature_data_builder["columns"] = []

    for feature_name in feature_column_names:
        feature_data_builder["columns"].append(feature_name["name"].replace(".", "__"))

    feature_data_model_format = {}
    feature_data_model_format["dataframe_split"] = feature_data_builder

    feature_order = [1, 2, 3, 4, 5, 0]
    feature_data_model_format["dataframe_split"]["columns"] = [feature_data_model_format["dataframe_split"]["columns"][i] for i in feature_order]
    feature_data_model_format["dataframe_split"]["data"][0] = [feature_data_model_format["dataframe_split"]["data"][0][i] for i in feature_order]

    # Request prediction from model serving endpoint
    headers = {"Authorization": f"Bearer " + dbutils.secrets.get(scope="tecton-demo", key="databricks-api-key")}
    response = requests.request(
        method="POST",
        headers=headers,
        url="https://tecton-production.cloud.databricks.com/serving-endpoints/fraud-detection/invocations",
        json=feature_data_model_format,
    )
    if response.status_code != 200:
        raise Exception(f"Request failed with status {response.status_code}, {response.text}")

    prediction = response.json()
    
    return prediction

In [0]:
online_feature_data = get_online_feature_data('user_502567604689', 72.46)
get_prediction_from_model(online_feature_data)

Out[28]: {'predictions': [0]}

## Create a function to evaluate a user transaction

In [0]:
def evaluate_transaction(user_id, amt):
    online_feature_data = get_online_feature_data(user_id, amt)
    is_predicted_fraud = get_prediction_from_model(online_feature_data)['predictions'][0]

    if is_predicted_fraud == 0:
        return 'Transaction accepted.'
    else:
        return 'Transaction denied.'

## Evaluate a transaction

In [0]:
evaluate_transaction('user_502567604689', 72.46)

Out[30]: 'Transaction accepted.'