## Getting data

Creating pandas DataFrame with iris dataset with added timestamps.

In [1]:
import pandas as pd
from sklearn.datasets import load_iris
from datetime import datetime
import numpy as np

chosen_datetime = '2023-02-20'

data = load_iris()
data.feature_names = ['sepal_length_cm',
 'sepal_width_cm',
 'petal_length_cm',
 'petal_width_cm']
df = pd.DataFrame(data= np.c_[data['data'], data['target']],
                     columns=data['feature_names'] + ['target'])
df['event_timestamp'] = [datetime.fromisoformat(chosen_datetime) for _ in data.target]
df['iris_id'] = df.index.values
df.head()

Unnamed: 0,sepal_length_cm,sepal_width_cm,petal_length_cm,petal_width_cm,target,event_timestamp,iris_id
0,5.1,3.5,1.4,0.2,0.0,2023-02-20,0
1,4.9,3.0,1.4,0.2,0.0,2023-02-20,1
2,4.7,3.2,1.3,0.2,0.0,2023-02-20,2
3,4.6,3.1,1.5,0.2,0.0,2023-02-20,3
4,5.0,3.6,1.4,0.2,0.0,2023-02-20,4


## Feature Store
Initialize Feature Store

In [2]:
# https://docs.feast.dev/reference/feast-cli-commands#init
!feast init -t local feature_repo


Creating a new Feast repository in [1m[32m/home/sbalawajder/projects/train/feast-mlflow/feast-mlflow-project/feature_repo[0m.



Save iris dataframe in feature repository in parquet format

In [3]:
df.to_parquet('feature_repo/feature_repo/data/iris_stats.parquet')

Overwrite example feast elements definition

In [4]:
# overwrite example feast elements definition
import os 
with open("feature_repo/feature_repo/example_repo.py", "w") as my_frepo:
    my_frepo.write(f"""from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, ValueType, FeatureService
from feast.types import Float32, Int64

iris_stats_source = FileSource(
    name = "iris_stats_source",
    path="{os.path.abspath(os.getcwd())}/feature_repo/feature_repo/data/iris_stats.parquet",
    timestamp_field="event_timestamp",
)

iris = Entity(name="iris", join_keys=["iris_id"])

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
iris_stats_fv = FeatureView(
    name="iris_stats_fv",
    entities=[iris],  # reference entity by name
    ttl=timedelta(days=1),
    schema=[
        Field(name="sepal_length_cm", dtype=Float32),
        Field(name="sepal_width_cm", dtype=Float32),
        Field(name="petal_length_cm", dtype=Float32),
        Field(name="petal_width_cm", dtype=Float32),
        Field(name="target", dtype=Int64),
    ],
    online=True,
    source=iris_stats_source
)

iris_stats_fs = FeatureService(
    name="iris_stats_fs",
    features=[iris_stats_fv]
)

""")

Create/update feature store deployment

In [5]:
# create/update feature store deployment
# https://docs.feast.dev/reference/feast-cli-commands#apply
!feast -c feature_repo/feature_repo apply

  schema = ParquetDataset(path).schema.to_arrow_schema()
Created entity [1m[32miris[0m
Created feature view [1m[32miris_stats_fv[0m
Created feature service [1m[32miris_stats_fs[0m

Created sqlite table [1m[32mfeature_repo_iris_stats_fv[0m



Get Feature View

In [8]:
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/feature_repo/.")

fv = store.get_feature_view("iris_stats_fv")

fv

<FeatureView(name = iris_stats_fv, entities = ['iris'], ttl = 1 day, 0:00:00, stream_source = None, batch_source = {
  "type": "BATCH_FILE",
  "timestampField": "event_timestamp",
  "fileOptions": {
    "uri": "/home/sbalawajder/projects/train/feast-mlflow/feast-mlflow-project/feature_repo/feature_repo/data/iris_stats.parquet"
  },
  "name": "iris_stats_source"
}, entity_columns = [iris_id-Int64], features = [sepal_length_cm-Float32, sepal_width_cm-Float32, petal_length_cm-Float32, petal_width_cm-Float32, target-Int64], description = , tags = {}, owner = , projection = FeatureViewProjection(name='iris_stats_fv', name_alias=None, desired_features=[], features=[sepal_length_cm-Float32, sepal_width_cm-Float32, petal_length_cm-Float32, petal_width_cm-Float32, target-Int64], join_key_map={}), created_timestamp = 2023-04-07 11:33:13.278712, last_updated_timestamp = 2023-04-07 11:33:13.278712, online = True, materialization_intervals = [])>

Get historical features (from Offline Store) for ML model training 

In [9]:
from feast_extra_functions import get_entity_df

training_df = store.get_historical_features(
    entity_df = get_entity_df(no_ids=150, dt = chosen_datetime, join_key = 'iris_id'),
    features=[
        "iris_stats_fv:sepal_length_cm",
        "iris_stats_fv:sepal_width_cm",
        "iris_stats_fv:petal_length_cm",
        "iris_stats_fv:petal_width_cm",
        "iris_stats_fv:target",
    ],
    full_feature_names = True,
).to_df()

training_df.head()

Unnamed: 0,iris_id,event_timestamp,iris_stats_fv__sepal_length_cm,iris_stats_fv__sepal_width_cm,iris_stats_fv__petal_length_cm,iris_stats_fv__petal_width_cm,iris_stats_fv__target
0,0,2023-02-20 00:00:00+00:00,5.1,3.5,1.4,0.2,0.0
1,96,2023-02-20 00:00:00+00:00,5.7,2.9,4.2,1.3,1.0
2,97,2023-02-20 00:00:00+00:00,6.2,2.9,4.3,1.3,1.0
3,98,2023-02-20 00:00:00+00:00,5.1,2.5,3.0,1.1,1.0
4,99,2023-02-20 00:00:00+00:00,5.7,2.8,4.1,1.3,1.0


Prepare data from Offline Store for ML model training

In [10]:
import importlib
import mlflow 
importlib.reload(mlflow)
import mlflow.sklearn
from sklearn.metrics import precision_score, accuracy_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier

Split data

In [11]:
# get features from dataframe taken from offline_store

train_X = training_df[['iris_stats_fv__sepal_length_cm','iris_stats_fv__sepal_width_cm', 'iris_stats_fv__petal_length_cm', 'iris_stats_fv__petal_width_cm']]
train_Y = training_df['iris_stats_fv__target']

# Split data in train & test
x_train, x_test, y_train, y_test = train_test_split(
    train_X,
    train_Y,
    test_size= 0.2,
    random_state= 1234
    )


Define hiperparameter grid with ML algorithms

In [12]:
model_params = {
    'svm': {
        'model': svm.SVC(gamma='auto'),
        'params' : {
            'C': [1,10,20],
            'kernel': ['rbf','linear']
        }  
    },
    'random_forest': {
        'model': RandomForestClassifier(),
        'params' : {
            'n_estimators': [1,5,10],
            'max_depth':[6,8,10], 
            'min_samples_split':[2,3,4,5],
            'min_samples_leaf':[2,3,4,5],
            'max_features': [2,3]
        }
    },
    'logistic_regression' : {
        'model': LogisticRegression(solver='liblinear',multi_class='auto'),
        'params': {
            'C': [1,5,10]
        }
    }
}

# Grid search
scores = []
class_grid_fit_list = []

for model_name, mp in model_params.items():
    class_grid =  GridSearchCV(mp['model'], mp['params'], cv=5, return_train_score=False)
    class_grid_fit = class_grid.fit(x_train, y_train)
    scores.append({
        'model': model_name,
        'best_score': class_grid.best_score_,
        'best_params': class_grid.best_params_
        
    })
    class_grid_fit_list.append(class_grid_fit)

# Printing results of training

df_training_results = pd.DataFrame(scores,columns=['model','best_score','best_params'])
df_training_results

Unnamed: 0,model,best_score,best_params
0,svm,0.975,"{'C': 1, 'kernel': 'rbf'}"
1,random_forest,0.983333,"{'max_depth': 6, 'max_features': 2, 'min_sampl..."
2,logistic_regression,0.991667,{'C': 10}


Log training results for ML model with the best score

In [13]:
model_run_id_algorithm_dict = {}

for index, cgf in enumerate(class_grid_fit_list):

    with mlflow.start_run():
        
        model_algorithm = tuple(model_params.keys())[index]
        model_algorithm_dict = {'algorithm': model_algorithm}
        
        cgf.best_params_.update(model_algorithm_dict)
    
        # I log the best fitting parameters 
        mlflow.log_params(cgf.best_params_)
        
        # I get predictions
        y_pred = cgf.predict(x_test)

        # I calculate acuraccy, precission & recall
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        print(f'ML algorithm: {model_algorithm}')

        # I log parameters
        metrics ={
            'accuracy': accuracy,
            'precision': precision, 
            'recall': recall 
            }

        mlflow.log_metrics(metrics)
        rid = mlflow.active_run().info.run_id
        print(rid)
        mlflow.sklearn.log_model(cgf, f"model_{model_algorithm}")     
           

ML algorithm: svm
c43b597209a541c7b1f0073ad330b111
ML algorithm: random_forest
21ec6291f496457d8b1be1444db331ec
ML algorithm: logistic_regression
04ae6eafb32c45c3936218bdcc0b6790


Run MLflow UI (optional)

In [14]:
!mlflow ui

[2023-04-07 13:35:31 +0200] [1810] [INFO] Starting gunicorn 20.1.0
[2023-04-07 13:35:31 +0200] [1810] [INFO] Listening at: http://127.0.0.1:5000 (1810)
[2023-04-07 13:35:31 +0200] [1810] [INFO] Using worker: sync
[2023-04-07 13:35:31 +0200] [1811] [INFO] Booting worker with pid: 1811
[2023-04-07 13:35:31 +0200] [1812] [INFO] Booting worker with pid: 1812
[2023-04-07 13:35:31 +0200] [1813] [INFO] Booting worker with pid: 1813
[2023-04-07 13:35:31 +0200] [1814] [INFO] Booting worker with pid: 1814
^C
[2023-04-07 13:35:47 +0200] [1810] [INFO] Handling signal: int
[2023-04-07 13:35:47 +0200] [1812] [INFO] Worker exiting (pid: 1812)
[2023-04-07 13:35:47 +0200] [1814] [INFO] Worker exiting (pid: 1814)
[2023-04-07 13:35:47 +0200] [1813] [INFO] Worker exiting (pid: 1813)
[2023-04-07 13:35:47 +0200] [1811] [INFO] Worker exiting (pid: 1811)


Materialize data from Offline Store to Online Store

In [15]:
from datetime import timedelta
from datetime import date

chosen_datetime_plus_1_day = (datetime.strptime(chosen_datetime, "%Y-%m-%d") + timedelta(days=1)).date()
chosen_datetime_plus_1_day

datetime.date(2023, 2, 21)

In [16]:
!feast -c feature_repo/feature_repo materialize $chosen_datetime $chosen_datetime_plus_1_day

Materializing [1m[32m1[0m feature views from [1m[32m2023-02-20 01:00:00+01:00[0m to [1m[32m2023-02-21 01:00:00+01:00[0m into the [1m[32msqlite[0m online store.

[1m[32miris_stats_fv[0m:
100%|███████████████████████████████████████████████████████████| 150/150 [00:00<00:00, 8432.57it/s]


Retrieve data from Online Store

In [27]:
online_features = store.get_online_features(
    features = [f"{fv.name}:{fname}" for fname in data.feature_names],
    entity_rows=[{f"{fv.entities[0]}_id" : id} for id in range(145,150)],
    full_feature_names = True
).to_df()[['iris_stats_fv__sepal_length_cm','iris_stats_fv__sepal_width_cm', 'iris_stats_fv__petal_length_cm', 'iris_stats_fv__petal_width_cm']]

online_features

Unnamed: 0,iris_stats_fv__sepal_length_cm,iris_stats_fv__sepal_width_cm,iris_stats_fv__petal_length_cm,iris_stats_fv__petal_width_cm
0,6.7,3.0,5.2,2.3
1,6.3,2.5,5.0,1.9
2,6.5,3.0,5.2,2.0
3,6.2,3.4,5.4,2.3
4,5.9,3.0,5.1,1.8


Use trained ML model for getting predictions

In [25]:
import pickle

with open (f"mlruns/0/{rid}/artifacts/model_{model_algorithm}/model.pkl", "rb") as f:
    model = pickle.load(f)

prediction = model.predict(online_features)
prediction


array([2., 2., 2., 2., 2.])