#### Engineering

I'll start by installing the Creme library. 

``pip install git+https://github.com/creme-ml/creme --upgrade``

I'm importing the packages that I need to train my models

In [2]:
import copy
import collections 
import datetime
import random
import tqdm

In [3]:
from creme import compose
from creme import feature_extraction
from creme import linear_model
from creme import metrics
from creme import neighbors
from creme import optim
from creme import preprocessing
from creme import stats
from creme import stream

I use this first function to parse the date and extract the number of the day.

In [5]:
def extract_date(x):
    """Extract features from the date."""
    import datetime
    if not isinstance(x['date'], datetime.datetime):
        x['date'] = datetime.datetime.strptime(x['date'], '%Y-%m-%d')
    x['wday'] = x['date'].weekday()
    return x

``get_metadata`` allow me to extract the identifier of the product.

In [None]:
def get_metadata(x):
    key = x['id'].split('_')
    x['item_id'] = f'{key[0]}_{key[1]}_{key[2]}'
    return x

Below I define the feature extraction pipeline. I use the module ``feature_extraction.TargetAgg`` to calculate the features on the target variable of the stream. I calculate rolling features on the target variable. I use the ``Shift`` module to easily compute features with a lag. I calculate rolling averages on the target variable with a lag of about 30 days. I also calculate the rolling average of the target variable as a function of the day.

In [7]:
extract_features = compose.TransformerUnion(
    compose.Select('wday'),
    
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Mean()),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Var()),

    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(3)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(7)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(15)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(20)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.RollingMean(30)),
    
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Shift(30) | stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Shift(29) | stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Shift(28) | stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Shift(27) | stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['item_id'], how=stats.Shift(26) | stats.RollingMean(1)),
    
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(1)),
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(3)),
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(7)),
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(15)),
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(20)),
    feature_extraction.TargetAgg(by=['wday'], how=stats.RollingMean(30)),
)

I will train two models per product, which represents **30490 * 2 models**. The first model with every product is a ``KNeighborsRegressor``. The second is a linear model. I noticed that these two models are complementary. I will select the best of the two models for each product as the model I will deploy in production.

The code below allows me to declare my two pipelines, the first one dedicated to KNN and the second one to the linear model.

In [8]:
# Init pipeline dedicated to KNN
knn = (
    compose.FuncTransformer(get_metadata) |
    compose.FuncTransformer(extract_date) |
    extract_features |
    neighbors.KNeighborsRegressor(window_size=300, n_neighbors=30, p=2)
)


# Init pipeline dedicated to linear model
lm = (
    compose.FuncTransformer(get_metadata) |
    compose.FuncTransformer(extract_date) |
    extract_features |
    linear_model.LinearRegression(optimizer=optim.SGD(0.00005), clip_gradient=1, intercept_lr=0.001)
)

The piece of code below creates a copy of both pipelines for all products and store them in a dictionary.

In [9]:
list_model = []

X_y = stream.iter_csv('./data/sample_submission.csv', target_name='F8')

for x, y in tqdm.tqdm(X_y, position=0):
    
    item_id = '_'.join(x['id'].split('_')[:5])

    if item_id not in list_model:

        list_model.append(item_id)
        
dict_knn = {item_id: copy.deepcopy(knn) for item_id in tqdm.tqdm(list_model, position=0)}
dict_lm  = {item_id: copy.deepcopy(lm) for item_id in tqdm.tqdm(list_model, position=0)}

60980it [00:16, 3649.20it/s]
100%|██████████| 30490/30490 [01:04<00:00, 475.75it/s]
100%|██████████| 30490/30490 [00:58<00:00, 524.95it/s]


I do a warm-up of all the models from a subset of the training set. To do this pre-training, I selected the last two months of the training set and saved it in csv format.I use Creme's ``stream.iter_csv`` module to iterate on the training dataset. The pipeline below consumes very little RAM memory because we load the data into the memory one after the other.

In [10]:
random.seed(42)

params = dict(target_name='y', converters={'y': int, 'id': str}, parse_dates= {'date': '%Y-%m-%d'})

# Init streaming csv reader
X_y = stream.iter_csv('./data/train_preprocessed.csv', **params)

bar = tqdm.tqdm(X_y, position = 0)

# Init online metrics:
metric_knn = collections.defaultdict(lambda: metrics.MAE())
metric_lm  = collections.defaultdict(lambda: metrics.MAE())

for i, (x, y) in enumerate(bar):
    
    # Extract item id
    item_id  = '_'.join(x['id'].split('_')[:5])
    
    # KNN
    
    # Evaluate performance of KNN
    y_pred_knn = dict_knn[f'{item_id}'].predict_one(x)
    
    # Update metric of KNN
    metric_knn[f'{item_id}'].update(y, y_pred_knn)
    
    # Fit KNN
    dict_knn[f'{item_id}'].fit_one(x=x, y=y)
    
    # Linear Model
    
    # Evaluate performance of linear model
    y_pred_lm  = dict_lm[f'{item_id}'].predict_one(x)
    
    # Update metric of linear model
    metric_lm[f'{item_id}'].update(y, y_pred_lm)
    
    # Train linear model for 10 epochs on each training example
    for _ in range(10):
        dict_lm[f'{item_id}'].fit_one(x=x, y=y)

1829400it [2:41:38, 188.64it/s]


After training, I prefer to save the metrics I have calculated for all products. My goal is to compare for each product the performance of the KNN and the linear model. I will select the better of the two for each product.

In [11]:
import json

scores_knn = {id: _.get() for id, _ in metric_knn.items()}

scores_lm  = {id: _.get() for id, _ in metric_lm.items()}

with open('scores_knn.json', 'w') as file:
    
    json.dump(scores_knn, file)

with open('scores_lm.json', 'w') as file:
    
    json.dump(scores_lm, file)

In [12]:
scores = {}

for item_id in tqdm.tqdm(scores_knn.keys()):
    
    score_knn = scores_knn[item_id]
    
    score_lm  = scores_lm[item_id]
    
    if score_knn < score_lm:
        
        scores[item_id] = score_knn
        
    else:
        
        scores[item_id] = score_lm

100%|██████████| 30490/30490 [00:00<00:00, 516387.22it/s]


I save both KNN and linear models:

In [13]:
import dill
# Per item name warning
with open('dict_knn_item.dill', 'wb') as file:
    
    dill.dump(dict_knn, file)
    
with open('dict_lm_item.dill', 'wb') as file:
    
    dill.dump(dict_lm, file)

Load scores and models

In [16]:
import json

with open('scores_knn.json', 'rb') as file:
    
    scores_knn = json.load(file)

with open('scores_lm.json', 'rb') as file:
    
    scores_lm = json.load(file)

In [22]:
import dill

with open('dict_knn_item.dill', 'rb') as file:
    
    dict_knn = dill.load(file)
    
with open('dict_lm_item.dill', 'rb') as file:
    
    dict_lm = dill.load(file)

For each product, I choose the best model between KNNRegressor and linear model depending on the validation score:

In [23]:
dict_model = {}

for item_id in tqdm.tqdm(scores_knn.keys()):
    
    score_knn = scores_knn[item_id]
    
    score_lm  = scores_lm[item_id]
    
    if score_knn < score_lm:
        
        dict_model[item_id] = dict_knn[item_id]
        
    else:
        
        dict_model[item_id] = dict_lm[item_id]
        
# Save selected models:
with open('dict_model_item.dill', 'wb') as file:
    
    dill.dump(dict_model, file)

100%|██████████| 30490/30490 [00:00<00:00, 34529.31it/s]


#### Deployment of the model:

**Now that all the models are pre-trained, I will be able to deploy the pipelines behind an API in a production environment. I will use the [Chantilly](https://github.com/creme-ml/chantilly) library to do so.**

**[Chantilly](https://github.com/creme-ml/chantilly) is a project that aims to ease train Creme models when they are deployed. Chantilly is a minimalist API based on the Flask framework.** Chantilly allows to make predictions, train models and measure model performance in real time. It gives access to a dashboard.

Chantilly is a library currently under development. For various reasons, I choose to extract the files from Chantilly that I'm interested in to realize this project. I chose to deploy my API with [Digital Ocean](https://www.digitalocean.com). You will be able to find the whole architecture of my API [here](https://github.com/raphaelsty/M5-Forecasting-Accuracy). 


To deploy my API, I followed the following steps:


- I selected the server on Digital Ocean with the smallest configuration


- Tutorial to initialize my server and firewall [here](https://www.digitalocean.com/community/tutorials/initial-server-setup-with-ubuntu-16-04)


- Tutorial to install Anaconda on my server [here](https://www.digitalocean.com/community/tutorials/how-to-install-the-anaconda-python-distribution-on-ubuntu-16-04)


- Allow reading on port 8080 to be able to request my API ``sudo ufw allow 8080``


- Installation of git ``sudo apt install git``


- Clone my API on the server ``git clones https://github.com/raphaelsty/M5-Forecasting-Accuracy.git``


- I installed the requirements of my repo ``pip install -r requirements.txt``.


- I went to the repository I cloned and ran the following command to start my API:
``waitress-serve --call 'app:create_app``.

That's it.


I initialize my API with flavor regression (see Chantilly tutorial):

In [32]:
url = 'http://creme-ml.com'

In [33]:
import requests

requests.post(f'{url}/api/init', json= {'flavor': 'regression'})

<Response [201]>

After initializing the flavor of my API, I upload all the models I've pre-trained. Each model has a name. This name is the name of the product. I have used dill to serialize the model before uploading it to my API.

In [35]:
with open('dict_model_item.dill', 'rb') as file:
    dic_models = dill.load(file)

In [36]:
for model_name, model in tqdm.tqdm(dic_models.items(), position=0):
    r = requests.post(f'{url}/api/model/{model_name}', data=dill.dumps(model))

  0%|          | 0/3049 [00:00<?, ?it/s]

All the models are now deployed in production and available to make predictions. The models can also be updated on a daily basis. That's it.

![](static/online_learning.png)

**As you may have noticed, the philosophy of online learning allows to reduce the complexity of the deployment of a machine learning algorithm in production. Moreover, to update the model, we only have to make calls to the API. We don't need to re-train the model from scratch.** To maintain my models on a daily basis, I recommend setting up a script that queries the database that stores the sales made that day. This script would perform 30490 queries every day to update the model. 

#### Make a prediction by calling the API:

In [39]:
r = requests.post(f'{url}/api/predict', json={
    'id': 1,
    'model': 'HOBBIES_1_001_CA_1',
    'features': {'date': '2016-05-23', 'id': 'HOBBIES_1_001_CA_1'}
})

#### Update models with new data:

In [42]:
r = requests.post(f'{url}/api/learn', json={
    'id': 1,
    'model': 'HOBBIES_1_001_CA_1',
    'ground_truth': 1,
})