# MACHINE LEARNING IN PRODUCTION MADRID - MLFLOW DEPLOYMENT

In previous lessons we've seen how to put a simple Scikit-Learn model into production. However, in the real world the models used to be complicated, maybe not Sklearn flavor and there is an important feature engineering of the input data.

You can also handle that with MLFlow. We'll see how to do it in the following cells.

## Custom Model to Production

The first thing we need to do is defining the paths to the pickle data we saved in previous lessons, in order to be able to reproduce the prediction pipeline.

In [1]:
pickle_data_path = '../output/pickle_data'

artifacts = {
    'encoder_path': f'{pickle_data_path}/encoder.pickle',
    'umap_path': f'{pickle_data_path}/umap.pickle',
    'hdbscan_path': f'{pickle_data_path}/hdbscan.pickle',
}

To put a model into production with MLFlow it is necessary to define a wrapper for it. The process is straightforward with a Scikit-Learn model (KMeans from previous lessons) since the Sklearn Wrapper has been already defined by MLFlow developers.

Thus, the only thing we need to do is extend the mlflow.pyfunc.PythonModel class and override the predict method:

```python
class ModelWrapper(mlflow.pyfunc.PythonModel):
    
    def predict(self, context, model_input):
        your_code_here
    
```

In the cell below, a custom mlflow.pyfunc.PythonModel has been defined. However, it is more complex than the previous definition since the feature engineering of the input data is also included here.

In [2]:
import mlflow.pyfunc

import numpy as np
import pandas as pd
import pickle
import hdbscan

class ModelWrapper(mlflow.pyfunc.PythonModel):

    # define some useful list of columns
    def __init__(self):

        self.columns_to_encode = ['origin', 'destination', 'train_type', 'train_class', 'fare']
        self.columns_to_remove = ['insert_date', 'start_date', 'end_date']

    # at the time of loading the MLFlow model, the pickle data from the baseline
    # pipeline has to be loaded
    def load_context(self, context):
        
        with open(context.artifacts['encoder_path'], 'rb') as f:
            self.encoder_m = pickle.load(f)
            
        with open(context.artifacts['umap_path'], 'rb') as f:
            self.umap_m = pickle.load(f)
        
        with open(context.artifacts['hdbscan_path'], 'rb') as f:
            self.hdbscan_m = pickle.load(f)
            
    # the datetime columns could arrive in the integer form, in that case convert to
    # datetime type
    def check_dt_type(self, model_input):
        
        if model_input[self.columns_to_remove[0]].dtype == 'int64':
            for col in self.columns_to_remove:
                model_input[col] = pd.to_datetime(model_input[col])
        
        return model_input

    # the baseline transformations are done here
    def transform(self, model_input):
        
        model_input.dropna(inplace=True)
        
        model_input = self.check_dt_type(model_input)
        
        model_input.loc[:, self.columns_to_encode] = \
            self.encoder_m.transform(model_input[self.columns_to_encode])
        
        model_input['duration'] = (model_input['end_date'] - model_input['start_date']).dt.seconds / 3600

        model_input['time_to_departure'] = (model_input['start_date'].dt.tz_localize('Europe/Madrid').dt.tz_convert('UTC') \
                                   - model_input['insert_date'].dt.tz_localize('UTC')).dt.days

        model_input['hour'] = model_input['start_date'].dt.hour

        model_input['weekday'] = model_input['start_date'].dt.dayofweek

        model_input = model_input[[x for x in model_input.columns if x not in self.columns_to_remove]]
        
        return model_input

    # main method to override, the OrdinalEncoder and UMAP transformations are done along
    # with the HDBSCAN prediction over this embedding
    def predict(self, context, model_input):
        
        # allocate payload with return value for null
        payload = np.ones(len(model_input)) * -1
        
        preprocessed = self.transform(model_input.reset_index(drop=True))
        embedding = self.umap_m.transform(preprocessed)
        clusters, _ = hdbscan.approximate_predict(self.hdbscan_m, embedding)
        
        # fill not null records with their cluster
        payload[preprocessed.index] = clusters
        
        return payload
        




After the custom model has been defined, it is necessary to pack everything together, both the model and the conda environment.

In [3]:
mlflow_pyfunc_model_path = '../output/custom_model'

# remove all models if already there
!rm -rf $mlflow_pyfunc_model_path

# conda environment definition
conda_env = {
    'channels': ['defaults'],
    'dependencies': [
        'python',
        {'pip': [
            'mlflow',
            'umap-learn',
            'hdbscan',
          ]
        },
    ],
    'name': 'custom_env',
}

# finally save the model as an MLFlow project into the output directory
mlflow.pyfunc.save_model(path=mlflow_pyfunc_model_path, 
                         python_model=ModelWrapper(),
                         conda_env=conda_env,
                         artifacts=artifacts)

## Setup Endpoint

In previous lessons we saw how to create an endpoint with MLFlow and the command line:

```bash
mlflow models serve -m path_to_your_model -h host -p port
```

However, it is desirable that this endpoint could be always alive. This can be done with systemd and the following configuration:

```
[Unit]
Description=MLFlow model in production
After=network.target

[Service]
Restart=on-failure
RestartSec=30
StandardOutput=file:/var/log/mlflow/production/stdout.log
StandardError=file:/var/log/mlflow/production/stderr.log
Environment=MLFLOW_TRACKING_URI=http://host_ts:port_ts
Environment=MLFLOW_CONDA_HOME=/home/ubuntu/miniconda3
ExecStart=/bin/bash -c 'PATH=/home/ubuntu/miniconda3/envs/mlinproduction_env/bin/:$PATH exec mlflow models serve -m path_to_your_model -h host -p port'

[Install]
WantedBy=multi-user.target
```



## Test Endpoint

Before testing the endpoint it is necessary to load some test data.

### Load Test Data

In [4]:
import pandas as pd

df = pd.read_parquet('../data/raw/renfe.parquet')

test_data = df.sample(10)

test_data

Unnamed: 0,insert_date,origin,destination,start_date,end_date,train_type,price,train_class,fare
4266349,2019-04-19 11:02:07,SEVILLA,MADRID,2019-05-15 07:40:00,2019-05-15 10:05:00,AVE,47.3,Turista,Promo
6261496,2019-05-15 01:20:47,BARCELONA,MADRID,2019-05-29 11:00:00,2019-05-29 13:45:00,AVE,90.5,Turista Plus,Promo
9583320,2019-06-20 17:25:44,MADRID,VALENCIA,2019-07-28 16:55:00,2019-07-28 21:58:00,AVE-LD,39.25,Turista con enlace,Promo
7197242,2019-05-25 07:48:22,MADRID,SEVILLA,2019-06-12 10:00:00,2019-06-12 12:32:00,AVE,53.4,Turista,Promo
10340229,2019-08-15 03:08:13,MADRID,VALENCIA,2019-08-28 16:55:00,2019-08-28 19:14:00,ALVIA,46.15,Preferente,Promo
7436516,2019-05-27 17:40:21,VALENCIA,MADRID,2019-07-08 12:40:00,2019-07-08 14:20:00,AVE,45.3,Turista,Promo
3724117,2019-04-15 14:08:30,BARCELONA,MADRID,2019-04-25 12:00:00,2019-04-25 15:10:00,AVE,58.15,Turista,Promo
7481454,2019-05-28 05:43:10,MADRID,SEVILLA,2019-07-15 08:30:00,2019-07-15 11:14:00,ALVIA,87.4,Preferente,Flexible
488848,2019-08-26 11:29:18,MADRID,VALENCIA,2019-10-21 15:10:00,2019-10-21 16:52:00,AVE,21.95,Turista,Promo
8780694,2019-06-11 14:03:46,MADRID,BARCELONA,2019-06-27 13:25:00,2019-06-27 16:24:00,AVE-TGV,107.7,Turista,Flexible


### Debug Model

In case the endpoint is not working as expected, the model can be loaded with the MLFlow API into the Jupyter notebook and start debugging it with the following cell.

In [5]:
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)

print(f'Predictions: {loaded_model.predict(test_data)}')

The keyword argument 'parallel=True' was specified but no transformation for parallel execution was possible.

To find out why, try turning on parallel diagnostics, see http://numba.pydata.org/numba-doc/latest/user/parallel.html#diagnostics for help.

File "../../../../../../miniconda3/envs/mlinproduction_env/lib/python3.7/site-packages/umap/nndescent.py", line 124:
    @numba.njit(parallel=True)
    def init_from_random(n_neighbors, data, query_points, heap, rng_state):
    ^

  self.func_ir.loc))
The keyword argument 'parallel=True' was specified but no transformation for parallel execution was possible.

To find out why, try turning on parallel diagnostics, see http://numba.pydata.org/numba-doc/latest/user/parallel.html#diagnostics for help.

File "../../../../../../miniconda3/envs/mlinproduction_env/lib/python3.7/site-packages/umap/nndescent.py", line 135:
    @numba.njit(parallel=True)
    def init_from_tree(tree, data, query_points, heap, rng_state):
    ^

  self.func_ir.loc))


Predictions: [8. 4. 8. 8. 8. 8. 7. 4. 9. 5.]


### Query Endpoint

Here, it is done via Python requests, however it can also be done with cURL or another tool.

In [6]:
import requests

host = 'localhost'
port = 8001

url = f'http://{host}:{port}/invocations'

headers = {
    'Content-Type': 'application/json',
}

r = requests.post(url=url, headers=headers, data=test_data.to_json(orient='split'))

print(f'Predictions: {r.text}')


Predictions: [8.0, 4.0, 8.0, 8.0, 8.0, 8.0, 7.0, 4.0, 9.0, 5.0]
