In [None]:
import pandas as pd
import numpy as np
import random

In [None]:
from fakefeaturestore import FeatureStore, DataTable, FeatureTable
fs = FeatureStore()

# Example: Fraud Detection 
Say that we want to train a model to detect if a transaction is fraudulent or not, given features derived from transaction, user, and fraud report data.

In [3]:
transactions = DataTable(name = "transactions", source ="transactions.csv") 
fraud_reports = DataTable(name = "fraud_reports", source ="fraud_reports.csv") 
users = DataTable(name = "user_info", source="user_info") 

In [4]:
transactions.data

Unnamed: 0,transaction_id,user_id,timestamp,amount
0,0,673,2020-09-20 13:00:20,15
1,1,119,2020-09-20 13:00:52,14
2,2,578,2020-09-20 13:01:08,21
3,3,460,2020-09-20 13:01:38,1
4,4,568,2020-09-20 13:02:05,41
...,...,...,...,...
9995,9995,307,2020-09-23 23:47:25,42
9996,9996,781,2020-09-23 23:48:15,8
9997,9997,357,2020-09-23 23:48:37,3
9998,9998,247,2020-09-23 23:49:10,35


In [5]:
users.data

Unnamed: 0,user_id,location,age
0,0,Boston,53
1,1,Boston,61
2,2,San Francisco,28
3,3,New York,79
4,4,Tucson,39
...,...,...,...
995,995,Tucson,67
996,996,Tucson,78
997,997,Tucson,26
998,998,Tucson,19


In [6]:
fraud_reports.data

Unnamed: 0,report_id,transaction_id,timestamp
0,0,797,2020-09-20 13:00:24
1,1,8185,2020-09-20 13:01:23
2,2,5532,2020-09-20 13:01:39
3,3,8963,2020-09-20 13:02:03
4,4,2811,2020-09-20 13:02:06
...,...,...,...
95,95,2023,2020-09-20 13:43:49
96,96,3843,2020-09-20 13:44:04
97,97,2096,2020-09-20 13:44:29
98,98,9837,2020-09-20 13:44:50


## Generating Features - Offline/Batch
We use the `featuretools` library (which automatically generates features) to show what kind of features a data scientist might want for this data. 

In [7]:
import featuretools as ft

# define data entities 
entities = {
    "users" : (users.data, "user_id"),
    "transactions" : (transactions.data, "transaction_id", "timestamp"),
    "fraud_reports": (fraud_reports.data, 'report_id', "timestamp")
}

# define relationships between data 
relationships = [
    ("users", "user_id", "transactions", "user_id"), # map users:user_id -> transactions:user_id
    ("transactions", "transaction_id", "fraud_reports", "transaction_id") # map transactions:transactions_id -> fraud_reports:transactions_id
]


In [14]:
feature_matrix, feature_defs = ft.dfs(
    entities=entities, 
    relationships=relationships, 
    target_entity="transactions",
    max_depth = 4
)

In [15]:
feature_matrix

Unnamed: 0_level_0,user_id,amount,COUNT(fraud_reports),DAY(timestamp),MONTH(timestamp),WEEKDAY(timestamp),YEAR(timestamp),users.location,users.age,MODE(fraud_reports.DAY(timestamp)),...,users.SKEW(transactions.NUM_UNIQUE(fraud_reports.WEEKDAY(timestamp))),users.SKEW(transactions.NUM_UNIQUE(fraud_reports.YEAR(timestamp))),users.STD(transactions.NUM_UNIQUE(fraud_reports.DAY(timestamp))),users.STD(transactions.NUM_UNIQUE(fraud_reports.MONTH(timestamp))),users.STD(transactions.NUM_UNIQUE(fraud_reports.WEEKDAY(timestamp))),users.STD(transactions.NUM_UNIQUE(fraud_reports.YEAR(timestamp))),users.SUM(transactions.NUM_UNIQUE(fraud_reports.DAY(timestamp))),users.SUM(transactions.NUM_UNIQUE(fraud_reports.MONTH(timestamp))),users.SUM(transactions.NUM_UNIQUE(fraud_reports.WEEKDAY(timestamp))),users.SUM(transactions.NUM_UNIQUE(fraud_reports.YEAR(timestamp)))
transaction_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
0,673,15,0.0,20,9,6,2020,Boston,90.0,,...,,,,,,,0.0,0.0,0.0,0.0
1,119,14,0.0,20,9,6,2020,Boston,27.0,,...,,,,,,,0.0,0.0,0.0,0.0
2,578,21,0.0,20,9,6,2020,Boston,59.0,,...,,,,,,,0.0,0.0,0.0,0.0
3,460,1,0.0,20,9,6,2020,San Francisco,64.0,,...,,,,,,,0.0,0.0,0.0,0.0
4,568,41,0.0,20,9,6,2020,San Francisco,65.0,,...,,,,,,,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,307,42,0.0,23,9,2,2020,Boston,90.0,,...,,,,,,,0.0,0.0,0.0,0.0
9996,781,8,0.0,23,9,2,2020,Tucson,23.0,,...,,,,,,,0.0,0.0,0.0,0.0
9997,357,3,0.0,23,9,2,2020,Boston,20.0,,...,,,,,,,0.0,0.0,0.0,0.0
9998,247,35,0.0,23,9,2,2020,San Francisco,43.0,,...,,,,,,,0.0,0.0,0.0,0.0


To train a model, we can use `feature_matrix` as our input `X` to train with labels `Y`. 
```
X = feature_matrix.encode()
model.train(feature_matrix, Y)
```

(note: you would want to define cutoff times/windows if you were actually doing this - but removed for simplicity)

## Generating Features - Online/Streaming
The features in `feature_matrix` are probably derived from data which is changing (e.g. streams) - so we'd want the features to be up-to-date with the most recent data. Doing so requires online maintenance of features. 

### Streaming Input 
Lets be more realistic and treat our data as streams. 

In [16]:
# streaming input 
transactions = DataTable(name = "transactions", source ="kafka://") 
fraud_reports = DataTable(name = "fraud_reports", source ="kafka://") 
users = DataTable(name = "user_info", source="kafka://") 

From the streaming input, we then want to define our features so that they can be maintained by flink. 
![alt text](img/flink-diagram.png "Title")

### Defining Features
We now define features over the streams as a `FeatureTable`. Features can be intermediate features which don't need to be externally queryable, but are just used to share computation between downstream features. 

In [17]:
user_transactions = FeatureTable(
    name = "user_transaction", 
    key = "user_id",
    data = transactions.data.groupby("user_id"), 
    queryable = False # intermediate feature -> not queried, but shared by multiple downstream features 
)

average_transaction = FeatureTable(
    name = "average_transaction",
    key = "user_id",
    data = user_transactions.data['amount'].mean(), # realistically, would window 
    queryable = True 
)

last_transaction_timestamp = FeatureTable(
    name = "last_transaction_timestamp",
    key = "user_id",
    data = user_transactions.data['timestamp'].max(), 
    queryable = True 
)

last_transaction_timestamp.data

user_id
0      2020-09-23 20:15:13
1      2020-09-23 15:27:34
2      2020-09-23 19:43:23
3      2020-09-23 18:57:50
4      2020-09-23 11:39:09
               ...        
996    2020-09-23 22:14:56
997    2020-09-23 08:21:36
998    2020-09-23 19:18:01
999    2020-09-23 21:40:30
1000   2020-09-23 15:12:48
Name: timestamp, Length: 1001, dtype: datetime64[ns]

### UDF Support 
Ideally, features can be defined arbitrarily with UDFs. These can be implemented as Ray Actors for scaleability and GPU support. 

In [18]:
from models import PredictIncomeModel

# UDF should be a class - needs intialization 
class PredictIncome: 
    
    def __init__(self, name, model_path):
        self.name = name 
        self.model = PredictIncomeModel(model_path) 
        
    def process(self, data):
        return self.model.predict(data)     

# register function 
fs.add_function(PredictIncome(name='predict_income',  model_path='model.pt'))

In [19]:
income = FeatureTable(
    name = "income",
    key = "user_id",
    data = user_transactions.data.apply(fs.functions['predict_income']),
    queryable = True 
)
income.data

user_id
0        17000
1        90000
2       140000
3        70000
4       140000
         ...  
996      35000
997      35000
998      35000
999      35000
1000     90000
Length: 1001, dtype: int64

# Querying Features
Once we have our features, we'd want to query the features in both an offline and online setting, to run model training and inference, respectively. With a feature store, streamining data, batch data, and derived features interfaced in the same way. 

In [20]:
from fakefeaturestore import PointQuery, BatchQuery

batch_query = BatchQuery(tables=['users', 'average_transaction', 'income'], time_range=('03/04/2021', '03/04/2021'))
point_query = PointQuery(key='user_id', tables=['users', 'average_transaction', 'income'], latency = 0.1)

Batch queries could look similar to a dataloader, where the latency of the prediction is unimportant as long as the data throughput isn't slowing down model training. 

In [22]:
for batch in batch_query.execute(batch_size = 1000):
    output = model(batch)
    # more training code... 

Point queries would only need to access one key, but should be able to meet provided latency SLOs. 

In [23]:
point_query.execute(key="sarah")

{'user_id': 'sarah', 'average_transaction': 430.5, 'income': 35000}

# Changing Feature Operators 
Pipelines are constantly being changes - and operators which are dependent on the data should change as the data changes

In [24]:
fs.replace_function(PredictIncome(name='predict_income',  model_path='new_model.pt'))

We could potentially define operators directly in terms of data, with a `refresh()` method. However you would need a way to deal with the resulting feature inconsistencies. 

In [None]:
class tfidf(): 
    
    def __init__(self, table): 
        self.tfid = TfidfVectorizor(table.data)
        
    def process(data):
        return self.tfid(data)
    
    def refresh(): 
        self.tfid = TfidfVectorizor(table.data)