# Overview
In this guide tutorial, we build the model that predicts if a driver will complete a trip based on a number of features ingested into Feast.

The basic local mode gives you ability to quickly try Feast, while the advanced mode shows how you can use Feast in a production setting

This tutorial uses Feast with scikit learn to:

* Train a model locally using data from BigQuery
* Test the model for online inference using SQLite (for fast iteration)
* Test the model for online inference using Firestore (to represent production)
 

# step1: 初始化feast

In [None]:
!pip install feast

In [1]:
!feast version 

Feast SDK Version: "feast 0.12.1"


In [112]:
import pandas as pd
import os

In [113]:
# 查看下训练结果集，包含了特征和时间戳
training_data = pd.read_parquet('./driver_ranking/data/driver_stats.parquet')

In [114]:
training_data.head(5)

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
0,2021-08-16 17:00:00+00:00,1005,0.864365,0.37732,642,2021-08-31 17:46:34.640
1,2021-08-16 18:00:00+00:00,1005,0.493151,0.302525,833,2021-08-31 17:46:34.640
2,2021-08-16 19:00:00+00:00,1005,0.267367,0.537665,431,2021-08-31 17:46:34.640
3,2021-08-16 20:00:00+00:00,1005,0.347099,0.674669,94,2021-08-31 17:46:34.640
4,2021-08-16 21:00:00+00:00,1005,0.89993,0.011923,882,2021-08-31 17:46:34.640


In [116]:
training_data.event_timestamp.describe(datetime_is_numeric=True)

count                                   1807
mean     2021-08-23 19:36:31.477587200+00:00
min                2021-04-12 07:00:00+00:00
25%                2021-08-20 10:00:00+00:00
50%                2021-08-24 04:00:00+00:00
75%                2021-08-27 22:00:00+00:00
max                2021-08-31 16:00:00+00:00
Name: event_timestamp, dtype: object

In [119]:
training_data[training_data.driver_id==1001].head(5)

Unnamed: 0,event_timestamp,driver_id,conv_rate,acc_rate,avg_daily_trips,created
1444,2021-08-16 17:00:00+00:00,1001,0.93772,0.799825,897,2021-08-31 17:46:34.640
1445,2021-08-16 18:00:00+00:00,1001,0.022976,0.403809,317,2021-08-31 17:46:34.640
1446,2021-08-16 19:00:00+00:00,1001,0.882327,0.387132,477,2021-08-31 17:46:34.640
1447,2021-08-16 20:00:00+00:00,1001,0.846462,0.313911,612,2021-08-31 17:46:34.640
1448,2021-08-16 21:00:00+00:00,1001,0.892155,0.636727,15,2021-08-31 17:46:34.640


In [121]:
!pwd

/Users/pengju.pan/gitcode/feast-driver-ranking-tutorial


# Step 2: 初始化配置并发布
`feast apply` scans python files in the current directory for feature definitions and deploys infrastructure according to `feature_store.yaml` 

`feast init` 慎用，不要重复多次启动`feast init`，否则就会重复产生 registry.db文件，会导致初始化无法成功；

In [123]:
!cd ./driver_ranking/ & feast teardown & rm -rf .ipynb_checkpoits/


Creating a new Feast repository in [1m[32m/Users/pengju.pan/gitcode/feast-driver-ranking-tutorial/driver_ranking/good_dolphin[0m.



备注： 
1. `feast teardown` 要非常谨慎的使用，如果使用了`feast init`，想更新配置，则要先执行`feast teardown` 删除了旧版本的设置
2. `feast teardown`之后，执行 `rm -rf .ipynb_checkpoits/ `命令，把notebook的中间结果都删除掉；
3. todo: `registry.db` 能够复用吗？怎么操作

In [125]:
!cd ./driver_ranking/ & feast apply


Creating a new Feast repository in [1m[32m/Users/pengju.pan/gitcode/feast-driver-ranking-tutorial/driver_ranking/unique_heron[0m.

Traceback (most recent call last):
  File "/Users/pengju.pan/opt/anaconda3/bin/feast", line 8, in <module>
    sys.exit(cli())
  File "/Users/pengju.pan/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/Users/pengju.pan/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/Users/pengju.pan/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/pengju.pan/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/pengju.pan/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  Fi

查看下生成的结果

In [60]:
!cd ./driver_ranking/ & ls -l

total 16
drwxr-xr-x  6 pengju.pan  FAREAST\Domain Users   192  8 31 20:02 [34mdata[m[m
-rw-r--r--@ 1 pengju.pan  FAREAST\Domain Users  1162  8 31 19:53 driver_repo.py
-rw-r--r--@ 1 pengju.pan  FAREAST\Domain Users   112  8 31 17:46 feature_store.yaml


In [61]:
# 查看订单结果，以driver_id作为主键，timestamp作为join条件
orders = pd.read_csv("./driver_orders.csv", sep="\t")
orders["event_timestamp"] = pd.to_datetime(orders["event_timestamp"])
orders[orders['driver_id']==1001].head(5)

Unnamed: 0,event_timestamp,driver_id,trip_completed
0,2021-04-16 20:29:28+00:00,1001,1
3,2021-04-17 20:29:28+00:00,1001,1
6,2021-04-18 20:29:28+00:00,1001,1


In [62]:
orders.event_timestamp

0   2021-04-16 20:29:28+00:00
1   2021-04-17 04:29:28+00:00
2   2021-04-17 12:29:28+00:00
3   2021-04-17 20:29:28+00:00
4   2021-04-18 04:29:28+00:00
5   2021-04-18 12:29:28+00:00
6   2021-04-18 20:29:28+00:00
7   2021-04-19 04:29:28+00:00
8   2021-04-19 12:29:28+00:00
9   2021-04-19 20:29:28+00:00
Name: event_timestamp, dtype: datetime64[ns, UTC]

# Step 3: 训练模型

In [130]:
import feast
from joblib import dump
import pandas as pd
from sklearn.linear_model import LinearRegression

# Load driver order data
orders = pd.read_csv("./driver_orders.csv", sep="\t")
orders["event_timestamp"] = pd.to_datetime(orders["event_timestamp"])
orders.head(5)

Unnamed: 0,event_timestamp,driver_id,trip_completed
0,2021-04-16 20:29:28+00:00,1001,1
1,2021-04-17 04:29:28+00:00,1002,0
2,2021-04-17 12:29:28+00:00,1003,0
3,2021-04-17 20:29:28+00:00,1001,1
4,2021-04-18 04:29:28+00:00,1002,0


In [None]:
# Connect to your feature store provider
fs = feast.FeatureStore(repo_path="./driver_ranking/")

# Retrieve training data from localfile
training_df = fs.get_historical_features(
    entity_df=orders,
    features=[
        "driver_hourly_stats:conv_rate",
        "driver_hourly_stats:acc_rate",
        "driver_hourly_stats:avg_daily_trips",
    ],
).to_df()

In [64]:
training_df = training_df.fillna(0)

In [131]:
training_df.head(5)

Unnamed: 0,event_timestamp,driver_id,trip_completed,conv_rate,acc_rate,avg_daily_trips
0,2021-04-16 20:29:28+00:00,1001,1,0.376489,0.433404,830
1,2021-04-17 04:29:28+00:00,1002,0,0.462278,0.299526,222
2,2021-04-17 12:29:28+00:00,1003,0,0.882432,0.208917,710
3,2021-04-17 20:29:28+00:00,1001,1,0.376489,0.433404,830
4,2021-04-18 04:29:28+00:00,1002,0,0.462278,0.299526,222


In [82]:
print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Example features -----\n")
print(training_df.head())

# Train model
target = "trip_completed"

reg = LinearRegression()
train_X = training_df[training_df.columns.drop(target).drop("event_timestamp")]
train_Y = training_df.loc[:, target]
reg.fit(train_X[sorted(train_X)], train_Y)

# Save model
dump(reg, "driver_model.bin")

----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
Int64Index: 10 entries, 0 to 9
Data columns (total 6 columns):
 #   Column           Non-Null Count  Dtype              
---  ------           --------------  -----              
 0   event_timestamp  10 non-null     datetime64[ns, UTC]
 1   driver_id        10 non-null     int64              
 2   trip_completed   10 non-null     int64              
 3   conv_rate        10 non-null     float32            
 4   acc_rate         10 non-null     float32            
 5   avg_daily_trips  10 non-null     int32              
dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2)
memory usage: 440.0 bytes
None

----- Example features -----

            event_timestamp  driver_id  trip_completed  conv_rate  acc_rate  \
0 2021-04-16 20:29:28+00:00       1001               1   0.376489  0.433404   
1 2021-04-17 04:29:28+00:00       1002               0   0.462278  0.299526   
2 2021-04-17 12:29:28+00:00       1003       

['driver_model.bin']

# Step 4: 增量更新Featuremap
Change the provider field in `driver_ranking/feature_store.yam` from `local` to `gcp` 
保证本地的文件格式和线上的是一样的，直接可以修改访问本地结果到线上的结果

In [83]:
!cd ./driver_ranking/ && feast materialize-incremental 2022-01-01T00:00:00

Materializing [1m[32m1[0m feature views to [1m[32m2022-01-01 08:00:00+08:00[0m into the [1m[32msqlite[0m online store.

[1m[32mdriver_hourly_stats[0m from [1m[32m2020-09-01 12:10:50+08:00[0m to [1m[32m2022-01-01 08:00:00+08:00[0m:
100%|███████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 1333.73it/s]


# Step 5:  结果预测

In [97]:
import pandas as pd
import feast
from joblib import load
import os

os.chdir('/Users/pengju.pan/gitcode/feast-driver-ranking-tutorial')

class DriverRankingModel:
    def __init__(self):
        # Load model
        self.model = load("./driver_model.bin")

        # Set up feature store
        self.fs = feast.FeatureStore(repo_path="./driver_ranking/")

    def predict(self, driver_ids):
        # Read features from Feast
        driver_features = self.fs.get_online_features(
            entity_rows=[{"driver_id": driver_id} for driver_id in driver_ids],
            features=[
                "driver_hourly_stats:conv_rate",
                "driver_hourly_stats:acc_rate",
                "driver_hourly_stats:avg_daily_trips",
            ],
        )
        df = pd.DataFrame.from_dict(driver_features.to_dict())

        # Make prediction
        df["prediction"] = self.model.predict(df[sorted(df)])

        # Choose best driver
        best_driver_id = df["driver_id"].iloc[df["prediction"].argmax()]

        # return best driver
        return best_driver_id,df

In [101]:
def make_drivers_prediction():
    drivers = [1001, 1002, 1003, 1004]
    model = DriverRankingModel()
    best_driver, result = model.predict(drivers)
    print(f"Prediction for best driver id: {best_driver} \n Finnal result: \n {result}")

In [102]:
make_drivers_prediction()

Prediction for best driver id: 1001 
 Finnal result: 
    driver_id  avg_daily_trips  conv_rate  acc_rate  prediction
0       1001              728   0.392329  0.528932    1.186537
1       1002               97   0.926635  0.664744    0.945375
2       1003              877   0.392009  0.547194    1.041685
3       1004              528   0.045298  0.028153   -0.852128


# step 6 查看registry的注册结果

In [144]:
import sqlite3

In [145]:
reg_conn = sqlite3.connect('./driver_ranking/data/registry.db')

In [152]:
csr = reg_conn.cursor()
csr.execute("""SELECT name FROM sqlite_master WHERE type='table';""")
print(csr.fetchall())

DatabaseError: file is not a database

In [None]:
    def get_registry_proto(self):
        registry_proto = RegistryProto()
        if self._filepath.exists():
            registry_proto.ParseFromString(self._filepath.read_bytes())
            return registry_proto