# Simple Components of an ML System

**Disclaimer**: Nothing in this notebook respresents the setup for an _actual_ production system, but is meant instead to be an educational tool representing some of the lightweight capabilities of some of these tools.

That being said, each code snippet below is meant to demonstrate some of the things you might see in an ML system that implements MLOps - using _only_ Python! Cool right?

But first - some data! Below is some synthetic user data with two input features and a target. Not really indicative of a real scenario, but good for demonstration purposes.

In [1]:
import pandas as pd 

user_data = pd.read_csv("data/user_data.csv", index_col=0)
user_data.head()



Unnamed: 0,feature_1,feature_2,target,user_id
0,0.493749,-0.157663,1,a27b0912-0cf8-11ed-899e-b29c4abd48f4
1,-0.123346,0.365275,1,a27b09ee-0cf8-11ed-899e-b29c4abd48f4
2,1.095964,-1.21264,0,a27b0a2a-0cf8-11ed-899e-b29c4abd48f4
3,0.038876,-0.154388,0,a27b0a52-0cf8-11ed-899e-b29c4abd48f4
4,1.364063,-0.956556,0,a27b0a7a-0cf8-11ed-899e-b29c4abd48f4


We're going to use this data to create features, train a model, record our experiments, "deploy" the latest model and monitor its performance!

## Feature Store
Feature stores (and feature systems) intend to make it easy to manage, productize, and define features for machine learning systems. They can typically be used for both model training and for low-latency serving. Some technologies to take a look at: Tecton, Feast.

The solution below allows us to convert our user data into a dictionary where `user_id` is the key - so we can retrieve features for users we want to make predictions for quickly on the fly. It also doesn't prevent us from reading all of the data in during the training process.

In [2]:
from typing import Iterable, Callable, Dict

class SuperSimpleFeatureStore:
    
    def __init__(self, dataframe: pd.DataFrame):
        self.dataframe = dataframe
        self.feature_dict = dataframe.set_index("user_id").T.to_dict()
            
    def register_feature(self, name: str, feature_definition: Callable) -> None:
        for key in self.feature_dict:
            self.feature_dict[key][name] = feature_definition(self.feature_dict[key]) 
            
    def get_user_feature(self, user_id: str) -> Dict:
        return self.feature_dict.get(user_id)
    
    def get_all_data(self) -> pd.DataFrame:
        return pd.DataFrame(self.feature_dict).T

We can initialize this class with our data, and then define new features as functions! The function will automatically be applied to our data and create the new features. 

In [3]:
feature_store = SuperSimpleFeatureStore(user_data)

def new_feature(feature_dict: Dict) -> Dict:
    return feature_dict["feature_1"] ** 2

feature_store.register_feature("feature_3", new_feature)
feature_store.get_user_feature("a27b09ee-0cf8-11ed-899e-b29c4abd48f4")

{'feature_1': -0.1233462294856719,
 'feature_2': 0.365275089687713,
 'target': 1.0,
 'feature_3': 0.015214292328332034}

## ML Metadata Store (Experiment tracking) and Model Registry
ML metadata (experiment tracking) is essentially your lab notebook for data science projects. The idea is you capture all metadata and information about experiment runs to make things reproducible. On top of that is a model registry, which would be a more central place to manage and version models. Some tools to look at: MLFlow, Weights and Biases, Comet, Sagemaker.

In this case - our design is simpler. We'll just capture all the information about our experiments in a CSV file that we use to track results. 

In [None]:
import os
import sys

# Makes sure joblib and scikitlearn are isntalled
!{sys.executable} -m pip install joblib
!{sys.executable} -m  pip install scikit-learn

# Allows python scripts to be run in the background for later
get_ipython().system = os.system

In [4]:
import csv
import joblib
import sklearn.pipeline
from datetime import datetime

## helper function to save a model/pipeline
def save_pipeline(pipeline: sklearn.pipeline.Pipeline, ts: datetime = None) -> str:
    model_path = f"data/pipeline-{ts}.pkl".replace(" ","-")
    joblib.dump(pipeline, model_path)
    return model_path

## helper function to write a new line to the csv
def record_model(pipeline: sklearn.pipeline.Pipeline, score: float, parameters: dict) -> None:
    model = str(pipeline)
    ts = datetime.now()
    model_path = save_pipeline(pipeline, ts)
    f = open('metadata_store.csv', 'a')
    csv_writer = csv.writer(f)
    csv_writer.writerow([ts, model, parameters, score, model_path])
    f.close()



Now that we have our helper functions, we can do different training runs and records the results.

In [6]:
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

## logistic regression parameter config
parameters = {
    "penalty":"l2",
    "C":1.0,
    "max_iter": 100
}

## use a standard scaler and logistic regression
scaler = StandardScaler()
logistic_regression = LogisticRegression(
    penalty=parameters["penalty"],
    C=parameters["C"],
    max_iter=parameters["max_iter"],
)

## make a pipeline out of them
pipeline = make_pipeline(scaler, logistic_regression)

## get our data from the feature store and create a train/test split
data = feature_store.get_all_data()
X_train, X_test, y_train, y_test = train_test_split(data[["feature_1","feature_2","feature_3"]], data["target"])

## fit the model
pipeline.fit(X_train, y_train)

## get the test score 
score = pipeline.score(X_test, y_test)

## record it
record_model(pipeline, score, parameters)

## view the output
pd.read_csv("metadata_store.csv")

Unnamed: 0,timestamp,model,parameters,score,model_path
0,2022-07-26 19:51:56.176934,"Pipeline(steps=[('standardscaler', StandardSca...","{'penalty': 'l2', 'C': 1.0, 'max_iter': 100}",0.88,data/pipeline-2022-07-26-19:51:56.176934.pkl


## Automated Training Pipeline
Once model code, data and parameters are optimized, code can be abstracted to a source-controlled repo (git) and the training code can be scheduled and automated. In cases like e-commerce, there is already new data coming in and models frequently need to be retrained. You want to automate the process of training and deploying models whose parameters have been fairly set. Some tools to look at: Airflow, Kubeflow.

In perhaps the biggest oversimplification thus far, I've added an infinite loop to the code above and put it in its own Python script. That way, we can run the script in the background to continuously train new models (with a 60 second sleep in between runs).

In [9]:
!python3 main.py &

## Continuous Integration
Continuous integration is the act of actively committing changes to a central repository, and also covers automated tests and builds. Most of these actions and triggered by git commits and pushes to remote repositories like Github. In this case, I've added 1 test that can be run with `pytest`. It is not automated in this case. Tools to look at: Jenkins, Buildkite.

In [10]:
!pytest

platform linux -- Python 3.10.5, pytest-7.1.2, pluggy-1.0.0
rootdir: /workspace
collected 1 item                                                               [0m

test_example.py [32m.[0m[32m                                                        [100%][0m



**ACTION**: practice CI on your own by adding the `pytest` [command as a git pre-commit hook](https://pre-commit.com/) (ie will run whenever you try to commit code).

## Continuous Delivery/Deployment & Model Server
Continuous delivery is the practice of reliabily releasing small iterative changes to software to ensure it can be reliably shipped. Continuous deployment is just consistently deploying. In the case of ML this would be part of the automated process - where a model training pipeline automatically ships a newly trained model to a model server. Tools to look at: Jenkins, Buildkite. 

A model server is typically an HTTP server that accepts prediction input (features) and returns predictions. Tools to look at: Tensorflow Serving, Seldon Core.

In this case, instead of doing "CD" we're just updating loading the latest trained model every prediction (remember it's updating in the background). Then, we use a `predict` function instead of a model server to fetch features for a given user ID and make a prediction. 

In [11]:
def load_latest_model() -> sklearn.pipeline.Pipeline:
    latest_model_path = pd.read_csv("metadata_store.csv").query("timestamp == timestamp.max()")["model_path"].values[0]
    loaded_model = joblib.load(latest_model_path)
    return loaded_model

def model_predict(user_id: str) -> float:
    model = load_latest_model()
    features = feature_store.get_user_feature(user_id)
    return model.predict([[v for k,v in features.items() if k != "target"]])
    
model_predict("a27b0912-0cf8-11ed-899e-b29c4abd48f4")



array([0.])

## Performance Monitoring
Monitoring and observability of production systems is absolutely one of the most critical components of one. Real systems also have alerting to notify engineers of production issues. In this case, we'll create a simple monitor that records the latency of predictions and reports on the mean. Some tools to look into: Prometheus, Grafana.

In [12]:
import time

class PerformanceMonitor:
    
    def __init__(self):
        self.inference_times = []
    
    def record(self, prediction_time: float) -> None:
        self.inference_times.append(prediction_time)
    
    def mean(self) -> float:
        return sum(self.inference_times)/len(self.inference_times)
        

def model_predict(user_id: str) -> float:
    start = time.time()
    model = load_latest_model()
    features = feature_store.get_user_feature(user_id)
    prediction = model.predict([[v for k,v in features.items() if k != "target"]])
    end = time.time()
    monitor.record(end-start)
    return prediction

monitor = PerformanceMonitor()
model_predict("a27b0912-0cf8-11ed-899e-b29c4abd48f4")



array([0.])

In [13]:
monitor.mean()

0.033744096755981445