In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# These are the parameters for the experiment. num_data is how many records are generated, and num_minutes is how many minutes the system will run requests.
num_data = 100000
num_minutes = 2

In [None]:
!printf feast_speed_testing/**\\ndata/**\\n.*\\n*.ipynb\\nregistry.db > .feastignore

In [None]:
%pip install 'feast[postgres, redis]' psycopg2 kagglehub

In [None]:
!feast apply

## Download the data and move it to a suitable (non-hidden) location.

In [None]:
import kagglehub
# https://www.kaggle.com/datasets/rashikrahmanpritom/heart-attack-analysis-prediction-dataset
path = kagglehub.dataset_download("rashikrahmanpritom/heart-attack-analysis-prediction-dataset")

In [None]:
print("Path to dataset files:", path)

In [None]:
!mv /opt/app-root/src/.cache/kagglehub/datasets/rashikrahmanpritom/heart-attack-analysis-prediction-dataset/versions/2 /opt/app-root/src/data

## Load data

In [None]:
from pandas import read_csv
from numpy import arange

data = read_csv("../data/heart.csv")
# TODO: Drop extra feature columns here.

# We add an ID column to demonstrate repeatability.
data['patient_id'] = arange(len(data))

data

In [None]:
from pandas import DataFrame
from scipy.stats import describe, truncnorm
from numpy import sqrt, round, int64
from numpy.random import SeedSequence

def generate_data(source_data: DataFrame, target_column: str = "output", num_samples: int = 100):
    generated_data = DataFrame()
    sequence = SeedSequence(123)
    for column in source_data.columns:
        if column != target_column:
            next_seed = sequence.spawn(1)[0]
            column_stats = describe(source_data[column])
            a, b = (column_stats.minmax[0] - column_stats.mean) / sqrt(column_stats.variance), (column_stats.minmax[1] - column_stats.mean) / sqrt(column_stats.variance)
            column_data = truncnorm.rvs(a=a, b=b, loc=column_stats.mean, scale=sqrt(column_stats.variance), size=num_samples, random_state=next_seed.generate_state(1)[0])

            column_type = source_data[column].dtype
            if column_type == int64:
                column_data = round(column_data)
            column_data = column_data.astype(column_type)

            generated_data[column] = column_data
    return generated_data

In [None]:
from datetime import datetime
from numpy import arange

timestamps = [datetime.now() for i in range(num_data)]

generated_data = generate_data(data, num_samples = num_data)
generated_data["patient_id"] = arange(num_data)
generated_data["event_timestamp"] = timestamps
generated_data["created"] = timestamps

## Save the generated data off to the offline feature store

In [None]:
from os import getenv
from sqlalchemy import create_engine, engine

connection_string = engine.URL.create(
    drivername="postgresql",
    username=getenv('DB_USERNAME'),
    password=getenv('DB_PASSWORD'),
    host=getenv('DB_HOST'),
    database=getenv('DB_NAME'),
)

this_engine = create_engine(connection_string)
generated_data.to_sql('heart_values', schema="feast", con=this_engine, if_exists="append", index=False)

In [None]:
from psycopg2 import connect

try:
    conn = connect(dbname=getenv('DB_NAME'), user=getenv('DB_USERNAME'), host=getenv('DB_HOST'), password=getenv('DB_PASSWORD'))
except:
    print("I am unable to connect to the database")

with conn.cursor() as curs:
    curs.execute("SELECT count(1) from feast.heart_values where patient_id < 1000")
    single_row = curs.fetchone()
    print(single_row)

conn.close()

### Materialize the data into the online feature store

In [None]:
!feast materialize 2024-12-01T00:00:00 2026-01-01T00:00:00

### Train the model

In [None]:
from sklearn.model_selection import train_test_split
from numpy.random import seed

seed(151123)

y = data['output']
X = data.drop(columns=['output'])

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42)
# Fun fact, if you don't drop the ID column, you get perfect accuracy.
X_train = X_train.drop(columns=["patient_id"])
X_test = X_test.drop(columns=["patient_id"])

### Make sure that heart_repo.py's data in transformed_data() matches the values you get here!

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_rescaled_train = scaler.fit_transform(X_train)
X_rescaled_test = scaler.transform(X_test)
scaler.mean_, scaler.scale_

### Model is trained here

In [None]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(max_iter=10000)
model.fit(X_rescaled_train, y_train)
model.coef_

In [None]:
from sklearn.metrics import confusion_matrix
pred = model.predict(X_rescaled_test)
"""
The confusion matrix when including all the features should match this:
array([[33,  9],
       [10, 48]])
"""
confusion_matrix(y_test, pred)

## Connect to the feature store to prepare for tests

In [None]:
from feast import FeatureStore

store = FeatureStore(repo_path=".")

In [None]:
from heart_repo import heart_v1
from numpy import random, array
from pandas import DataFrame

def run():
    patient_id = random.randint(0, num_data, 1)[0]

    features = store.get_online_features(
        features=heart_v1,
        entity_rows=[
            {
                "patient_id": patient_id
            }
        ],
    ).to_df()
    return model.predict(features.drop(columns=["patient_id"]).to_numpy())

# Because of how Python decorators work, you can't just import the function. This function matches heart_repo.transformed_data exactly, however.
# TODO: Drop feature columns here, and remove the corresponding records from the 2 arrays in the `features` definition.
def run_in_memory(this_row: DataFrame):
    raw_features = this_row[['age', 'sex', 'cp', 'trtbps', 'chol', 'fbs', 'restecg', 'thalachh',
       'exng', 'oldpeak', 'slp', 'caa', 'thall']]
    features = (raw_features - array([5.46502463e+01, 6.60098522e-01, 1.00985222e+00, 1.30812808e+02,
        2.48448276e+02, 1.28078818e-01, 5.27093596e-01, 1.49655172e+02,
        3.30049261e-01, 1.03300493e+00, 1.40394089e+00, 6.35467980e-01,
        2.33004926e+00])) / array([ 8.99758246,  0.47367548,  1.04098356, 17.02951461, 54.87667351,
         0.33417755,  0.52803654, 22.61651409,  0.47023052,  1.09919108,
         0.61538905,  0.91787029,  0.59920596])
    return model.predict([features.to_numpy()])

In [None]:
# We print the number of minutes elapsed so Jupyter doesn't timeout its connection.
from time import time
from numpy.random import seed
seed(12351)

start_time = time()
end_time = start_time + 60 * num_minutes
printed_time = False
counts = 0

while time() < end_time:
    time_elapsed = int(time() - start_time)
    if time_elapsed % 60 == 0:
        if not printed_time:
            print(f"{time_elapsed / 60} minutes elapsed")
            printed_time = True
    else:
        printed_time = False
    run()
    counts += 1
counts

seed(12351)

start_time = time()
end_time = start_time + 60 * num_minutes
printed_time = False
counts_in_memory = 0

while time() < end_time:
    time_elapsed = int(time() - start_time)
    if time_elapsed % 60 == 0:
        if not printed_time:
            print(f"{time_elapsed / 60} minutes elapsed")
            printed_time = True
    else:
        printed_time = False
    run_in_memory(generated_data.iloc[random.randint(0, num_data, 1)[0]])
    counts_in_memory += 1
counts_in_memory

### This are the final iteration counts when run for the number of minutes provided

This is the literal definition of a Poisson Distribution (I.I.D. events in a fixed interval of time and we want the count), so variance = mean

In [None]:
from numpy import sqrt

print(f"The number of calls in {num_minutes} minutes is:")
print(f"{counts} +- {sqrt(counts)} using Feast")
print(f"{counts_in_memory} +- {sqrt(counts_in_memory)}  calculating on the fly entirely in memory")

In [None]:
print(f"The number of interpolated calls in 24 hours is:")
print(f"{counts * 24 * 60 / num_minutes} +- {sqrt(counts * 24 * 60 / num_minutes)} using Feast")
print(f"{counts_in_memory * 24 * 60 / num_minutes} +- {sqrt(counts_in_memory * 24 * 60 / num_minutes)}  calculating on the fly entirely in memory")