# 03.SAI02: 𝗙𝗲𝗮𝘁𝘂𝗿𝗲 𝗦𝘁𝗼𝗿𝗲. (Note: Demo folder is under `/sai02/feature_repo`)

**Problem**: Suppose we have a machine learning model that predicts the ranking of driver using the driver status dataset. We need to train and serve this model in production, but first, we need to extract and transform the relevant features.

With a feature store, we can store and manage these features in a centralized location. Feast provides an API to interact with this feature store and integrates with different data sources to ingest data into the store.

Here's an example of how we could set up a feature store system for this churn prediction model:

Ref: 

https://docs.feast.dev/<br>
https://github.com/feast-dev/feast-gcp-driver-ranking-tutorial

## Inspect Data

In [1]:
import pandas as pd
df = pd.read_parquet("feature_repo/data/driver_stats.parquet")
df

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2023-04-14 01:45:08.683604+00:00,1001,1.000000,1.000000,1000,2023-04-14 01:45:08.683610
1,2023-04-13 20:37:11.840487+00:00,1001,1.000000,1.000000,1000,2023-04-13 20:37:11.840498
2,2023-03-29 00:00:00+00:00,1005,0.165560,0.670792,506,2023-04-13 00:53:40.636000
3,2023-03-29 01:00:00+00:00,1005,0.306372,0.517036,844,2023-04-13 00:53:40.636000
4,2023-03-29 02:00:00+00:00,1005,0.852396,0.342542,568,2023-04-13 00:53:40.636000
...,...,...,...,...,...,...
1804,2023-04-12 22:00:00+00:00,1001,0.834586,0.225937,788,2023-04-13 00:53:40.636000
1805,2023-04-12 23:00:00+00:00,1001,0.498457,0.872914,933,2023-04-13 00:53:40.636000
1806,2021-04-12 07:00:00+00:00,1001,0.848513,0.993941,484,2023-04-13 00:53:40.636000
1807,2023-04-05 12:00:00+00:00,1003,0.542288,0.344721,346,2023-04-13 00:53:40.636000


In [2]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1809 entries, 0 to 1808
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype              
---  ------           --------------  -----              
 0   event_timestamp  1809 non-null   datetime64[ns, UTC]
 1   driver_id        1809 non-null   int64              
 2   conv_rate        1809 non-null   float32            
 3   acc_rate         1809 non-null   float32            
 4   avg_daily_trips  1809 non-null   int32              
 5   created          1809 non-null   datetime64[ns]     
dtypes: datetime64[ns, UTC](1), datetime64[ns](1), float32(2), int32(1), int64(1)
memory usage: 63.7 KB


## Step1: Apply feast 

We define a schema for the features that we want to extract and store in the feature store. For example, we might want to include features such as a customer's age, gender, transaction history, and purchase frequency.

Under the `/sai02/feature_repo` folder, run the command below

In [3]:
from datetime import datetime
from feast import FeatureStore
from feast.data_source import PushMode
from joblib import dump
from sklearn.linear_model import LinearRegression
from joblib import load
import pandas as pd

Run the command below: 

```bash
(venv)$ feast apply
```

Throw message 

```
Created entity driver
Created feature view driver_hourly_stats
Created feature view driver_hourly_stats_fresh
Created on demand feature view transformed_conv_rate_fresh
Created on demand feature view transformed_conv_rate
Created feature service driver_activity_v1
Created feature service driver_activity_v2
Created feature service driver_activity_v3

Created sqlite table feast_demo_driver_hourly_stats_fresh
Created sqlite table feast_demo_driver_hourly_stats
```

In [4]:
store = FeatureStore(repo_path="feature_repo/")

## Step2: Get historical features for training from Offfline Store

In [5]:
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2021, 4, 12, 10, 59, 42),
            datetime(2021, 4, 12, 8, 12, 10),
            datetime(2021, 4, 12, 16, 40, 26),
        ],
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()
training_df

Unnamed: 0,driver_id,event_timestamp,label_driver_reported_satisfaction,val_to_add,val_to_add_2,conv_rate,acc_rate,avg_daily_trips,conv_rate_plus_val1,conv_rate_plus_val2
0,1001,2021-04-12 10:59:42+00:00,1,1,10,0.848513,0.993941,484,1.848513,10.848513
1,1002,2021-04-12 08:12:10+00:00,5,2,20,0.286453,0.845122,627,2.286453,20.286453
2,1003,2021-04-12 16:40:26+00:00,3,3,30,0.750681,0.794102,449,3.750681,30.750681


In [6]:
df[df["avg_daily_trips"] == 484]

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
1806,2021-04-12 07:00:00+00:00,1001,0.848513,0.993941,484,2023-04-13 00:53:40.636


In [7]:
df[df["avg_daily_trips"] == 627]

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
303,2023-04-10 13:00:00+00:00,1005,0.14354,0.084671,627,2023-04-13 00:53:40.636
1382,2023-04-10 09:00:00+00:00,1002,0.37193,0.272448,627,2023-04-13 00:53:40.636
1445,2021-04-12 07:00:00+00:00,1002,0.286453,0.845122,627,2023-04-13 00:53:40.636


In [8]:
df[df["avg_daily_trips"] == 449]

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
85,2023-04-01 11:00:00+00:00,1005,0.986477,0.570027,449,2023-04-13 00:53:40.636
1084,2021-04-12 07:00:00+00:00,1003,0.750681,0.794102,449,2023-04-13 00:53:40.636
1145,2023-03-31 12:00:00+00:00,1002,0.121907,0.815285,449,2023-04-13 00:53:40.636


## Step3: Get historical features for batch scoring

In [9]:
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": pd.to_datetime("now", utc=True),
        # (optional) label name -> label values. Feast does not process these
        "label_driver_reported_satisfaction": [1, 5, 3],
        # values we're using for an on-demand transformation
        "val_to_add": [1, 2, 3],
        "val_to_add_2": [10, 20, 30],
    }
)

training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
        "transformed_conv_rate:conv_rate_plus_val1",
        "transformed_conv_rate:conv_rate_plus_val2",
    ],
).to_df()
training_df

Unnamed: 0,driver_id,event_timestamp,label_driver_reported_satisfaction,val_to_add,val_to_add_2,conv_rate,acc_rate,avg_daily_trips,conv_rate_plus_val1,conv_rate_plus_val2
0,1003,2023-05-12 04:39:20.884191+00:00,3,3,30,0.438662,0.882909,66,3.438662,30.438662
1,1002,2023-05-12 04:39:20.884191+00:00,5,2,20,0.271404,0.273073,381,2.271404,20.271404
2,1001,2023-05-12 04:39:20.884191+00:00,1,1,10,1.0,1.0,1000,2.0,11.0


## Step4: Load features into online store

In [10]:
store.materialize_incremental(end_date=datetime.now())

Materializing [1m[32m2[0m feature views to [1m[32m2023-05-12 11:39:21+07:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats_fresh[0m from [1m[32m2023-05-12 18:30:37+07:00[0m to [1m[32m2023-05-12 11:39:21+07:00[0m:


100%|████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 101.05it/s]


[1m[32mdriver_hourly_stats[0m from [1m[32m2023-05-12 18:30:37+07:00[0m to [1m[32m2023-05-12 18:39:21+07:00[0m:


100%|█████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 87.22it/s]


## Step5: Online features

In [11]:
entity_rows = [
    # {join_key: entity_value}
    {
        "driver_id": 1001,
        "val_to_add": 1000,
        "val_to_add_2": 2000,
    },
    {
        "driver_id": 1002,
        "val_to_add": 1001,
        "val_to_add_2": 2002,
    },
]

features_to_fetch = [
    "driver_hourly_stats:acc_rate",
    "transformed_conv_rate:conv_rate_plus_val1",
    "transformed_conv_rate:conv_rate_plus_val2",
]
returned_features = store.get_online_features(
    features=features_to_fetch,
    entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
    print(key, " : ", value)

acc_rate  :  [1.0, 0.2730732560157776]
conv_rate_plus_val1  :  [1001.0, 1001.2714039981365]
conv_rate_plus_val2  :  [2001.0, 2002.2714039981365]
driver_id  :  [1001, 1002]


## Step6: Online features retrieved (instead) through a feature service

In [12]:
entity_rows = [
    # {join_key: entity_value}
    {
        "driver_id": 1001,
        "val_to_add": 1000,
        "val_to_add_2": 2000,
    },
    {
        "driver_id": 1002,
        "val_to_add": 1001,
        "val_to_add_2": 2002,
    },
]

features_to_fetch = store.get_feature_service("driver_activity_v1")

returned_features = store.get_online_features(
    features=features_to_fetch,
    entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
    print(key, " : ", value)

conv_rate  :  [1.0, 0.2714039981365204]
conv_rate_plus_val1  :  [1001.0, 1001.2714039981365]
conv_rate_plus_val2  :  [2001.0, 2002.2714039981365]
driver_id  :  [1001, 1002]


## Step7: Online features retrieved (using feature service v3, which uses a feature view with a push source

In [13]:
entity_rows = [
    # {join_key: entity_value}
    {
        "driver_id": 1001,
        "val_to_add": 1000,
        "val_to_add_2": 2000,
    },
    {
        "driver_id": 1002,
        "val_to_add": 1001,
        "val_to_add_2": 2002,
    },
]

features_to_fetch = store.get_feature_service("driver_activity_v3")

returned_features = store.get_online_features(
    features=features_to_fetch,
    entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
    print(key, " : ", value)

acc_rate  :  [1.0, 0.2730732560157776]
avg_daily_trips  :  [1000, 381]
conv_rate  :  [1.0, 0.2714039981365204]
conv_rate_plus_val1  :  [1001.0, 1001.2714039981365]
conv_rate_plus_val2  :  [2001.0, 2002.2714039981365]
driver_id  :  [1001, 1002]


## Step8: Simulate a stream event ingestion of the hourly stats df

In [14]:
event_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001],
        "event_timestamp": [
            datetime.now(),
        ],
        "created": [
            datetime.now(),
        ],
        "conv_rate": [1.0],
        "acc_rate": [1.0],
        "avg_daily_trips": [1000],
    }
)
print(event_df)
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)

   driver_id            event_timestamp                    created  conv_rate  \
0       1001 2023-05-12 11:39:24.513666 2023-05-12 11:39:24.513668        1.0   

   acc_rate  avg_daily_trips  
0       1.0             1000  


###  Online features again with updated values from a stream push

In [15]:
entity_rows = [
    # {join_key: entity_value}
    {
        "driver_id": 1001,
        "val_to_add": 1000,
        "val_to_add_2": 2000,
    },
    {
        "driver_id": 1002,
        "val_to_add": 1001,
        "val_to_add_2": 2002,
    },
]

features_to_fetch = store.get_feature_service("driver_activity_v3")

returned_features = store.get_online_features(
    features=features_to_fetch,
    entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
    print(key, " : ", value)

acc_rate  :  [1.0, 0.2730732560157776]
avg_daily_trips  :  [1000, 381]
conv_rate  :  [1.0, 0.2714039981365204]
conv_rate_plus_val1  :  [1001.0, 1001.2714039981365]
conv_rate_plus_val2  :  [2001.0, 2002.2714039981365]
driver_id  :  [1001, 1002]


## Step9: Train your model using Offline store

### Get data from Offline store

In [16]:
entity_df = pd.DataFrame.from_dict(
    {
        # entity's join key -> entity values
        "driver_id": [1001, 1002, 1003, 1001, 1002, 1003, 1001, 1002, 1003, 1004],
        # "event_timestamp" (reserved key) -> timestamps
        "event_timestamp": [
            datetime(2023, 4, 1, 20, 29, 28),
            datetime(2023, 4, 2, 4, 29, 28),
            datetime(2023, 4, 2, 12, 29, 28),
            datetime(2023, 4, 2, 20, 29, 28),
            datetime(2023, 4, 3, 4, 29, 28),
            datetime(2023, 4, 3, 12, 29, 28),
            datetime(2023, 4, 3, 20, 29, 28),
            datetime(2023, 4, 4, 4, 29, 28),
            datetime(2023, 4, 4, 12, 29, 28),
            datetime(2023, 4, 4, 20, 29, 28),
        ],
        # (optional) label name -> label values. Feast does not process these
        "trip_completed": [1, 0, 0, 1, 0, 0, 1, 0, 0, 1]
    }
)
        
# Retrieve training data from BigQuery
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())


----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype              
---  ------           --------------  -----              
 0   driver_id        10 non-null     int64              
 1   event_timestamp  10 non-null     datetime64[ns, UTC]
 2   trip_completed   10 non-null     int64              
 3   conv_rate        10 non-null     float32            
 4   acc_rate         10 non-null     float32            
 5   avg_daily_trips  10 non-null     int32              
dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2)
memory usage: 488.0 bytes
None

----- Example features -----

   driver_id           event_timestamp  trip_completed  conv_rate  acc_rate  \
0       1001 2023-04-01 20:29:28+00:00               1   0.112901  0.529678   
1       1002 2023-04-02 04:29:28+00:00               0   0.945234  0.826637   
2       1003 2023-04-02 12:29:28+00:00       

### Train Linear Regression model

In [17]:
# Train model
target = "trip_completed"

reg = LinearRegression()
train_X = training_df[training_df.columns.drop(target).drop("event_timestamp")]
train_Y = training_df.loc[:, target]
reg.fit(train_X[sorted(train_X)], train_Y)

# Save model
dump(reg, "feature_repo/driver_model.bin")

['feature_repo/driver_model.bin']

## Step10: Serving your model using Online store

In [18]:
import pandas as pd
import feast

class DriverRankingModel:
    def __init__(self):
        # Load model
        self.model = load("feature_repo/driver_model.bin")

        # Set up feature store
        self.store = FeatureStore(repo_path="feature_repo/")

    def predict(self, driver_ids):
        # Read features from Feast
        driver_features = self.store.get_online_features(
            entity_rows=[{"driver_id": driver_id} for driver_id in driver_ids],
            features=[
                "driver_hourly_stats:conv_rate",
                "driver_hourly_stats:acc_rate",
                "driver_hourly_stats:avg_daily_trips",
            ],
        )
        df = pd.DataFrame.from_dict(driver_features.to_dict())

        # Make prediction
        df["prediction"] = self.model.predict(df[sorted(df)])

        # Choose best driver
        best_driver_id = df["driver_id"].iloc[df["prediction"].argmax()]

        # return best driver
        return best_driver_id, df["prediction"]
    
model = DriverRankingModel()

In [19]:
drivers = [1002, 1003, 1004, 1005]
    
best_driver, scores = model.predict(drivers)
print(f"Prediction for best driver id: {best_driver}")
print("Score:")
for i, dri in enumerate(drivers):
    print(f"Driver {dri}: {scores[i]}", )

Prediction for best driver id: 1002
Score:
Driver 1002: 0.4540042116577183
Driver 1003: 0.07213536439167001
Driver 1004: 0.1078804121036967
Driver 1005: -0.1617511067313444
