# Best Engineering Practices

![Status](https://img.shields.io/static/v1.svg?label=Status&message=Finished&color=brightgreen)
[![Source](https://img.shields.io/static/v1.svg?label=GitHub&message=Source&color=181717&logo=GitHub)](https://github.com/particle1331/inefficient-networks/blob/master/docs/notebooks/mlops/04-deployment)
[![Stars](https://img.shields.io/github/stars/particle1331/inefficient-networks?style=social)](https://github.com/particle1331/inefficient-networks)

```text
𝗔𝘁𝘁𝗿𝗶𝗯𝘂𝘁𝗶𝗼𝗻: Notes for Module 6 of the MLOps Zoomcamp (2022) by DataTalks.Club.
```

---

## Introduction

In this module, we will cover best practices for developing and deploying our code. We will take our example [streaming code](https://particle1331.github.io/inefficient-networks/notebooks/mlops/04-deployment/notes.html#streaming-deploying-models-with-kinesis-and-lambda) from a previous module, break it down into testable units, and generally just improve it with software engineering best practices. 

More precisely, we create and automate unit and integration testing, code quality checks, and add pre-commit hooks for all of these. We will also look at how to use `make` which is a nice tools for abstracting and automating repetitive but involved tasks. 

## Testing Python code with pytest

Let us look at the [streaming module](https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/04-deployment/streaming/lambda_function.py) that we will work on:

```python
import os
import json
import boto3
import base64

import mlflow


# Load environmental variables
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'ride_predictions')
RUN_ID = os.getenv('RUN_ID')
TEST_RUN = os.getenv('TEST_RUN', 'False') == 'True'

# Load model from S3
logged_model = f's3://mlflow-models-ron/1/{RUN_ID}/artifacts/model'
model = mlflow.pyfunc.load_model(logged_model)


def prepare_features(ride):
    features = {}
    features['PU_DO'] = '%s_%s' % (ride['PULocationID'], ride['DOLocationID'])
    features['trip_distance'] = ride['trip_distance']
    return features


def predict(features):
    pred = model.predict(features)
    return float(pred[0])


def lambda_handler(event, context):
    
    predictions_events = []
    
    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        decoded_data = base64.b64decode(encoded_data).decode('utf-8')
        ride_event = json.loads(decoded_data)

        ride = ride_event['ride']
        ride_id = ride_event['ride_id']
    
        features = prepare_features(ride)
        prediction = predict(features)
    
        prediction_event = {
            'model': 'ride_duration_prediction_model',
            'version': '123',
            'prediction': {
                'ride_duration': prediction,
                'ride_id': ride_id
            }
        }

        if not TEST_RUN:
            kinesis_client = boto3.client('kinesis')
            kinesis_client.put_record(
                StreamName=PREDICTIONS_STREAM_NAME,
                Data=json.dumps(prediction_event),
                PartitionKey=str(ride_id)
            )
        
        predictions_events.append(prediction_event)


    return {
        'predictions': predictions_events
    }
```

To review, first this script loads the environmental variables and the model from S3. Then, it defines two helper functions for preprocessing and making prediction with the model. The most important function in this script is `lambda_handler` which takes in an `event` which contains a batch of events from the input stream. This explains the outer loop over `event['Records']`. 

Inside this block, the data is decoded and a prediction is made which is packaged as an event for the output stream. If this function is in production, i.e. outside of a test run, then the prediction event is written on the output stream. In this case, a Kinesis client is instantiated, and an output event is written to the specific predictions stream.

### Adding unit tests

First, we will create a `tests/` folder where we will put all our tests. We will be using `pipenv` to manage our environment. See the [previous module](https://particle1331.github.io/inefficient-networks/notebooks/mlops/04-deployment/notes.html#setting-up-the-environment-with-pipenv) for details. We will start with the following Pipfile:

```ini
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
boto3 = "*"
mlflow = "*"
scikit-learn = "==1.0.2"

[dev-packages]
pytest = "*"

[requires]
python_version = "3.9"
```

Notice that this installs [pytest](https://docs.pytest.org/en/7.1.x/) as a dev dependency. Let us create one test:

```python
# tests/model_test.py
import lambda_function


def test_prepare_features():
    """Test preprocessing."""

    ride = {
        "PULocationID": 130,
        "DOLocationID": 205,
        "trip_distance": 3.66
    }

    actual_features = lambda_function.prepare_features(ride)
    
    expected_features = {
        'PU_DO': '130_205',
        'trip_distance': 3.66,
    }

    assert actual_features == expected_features
```

Before running this, since this test is not related to the model, we can comment out the block that loads the model from S3 to make this test run faster. Tests can be run either by doing `$ pytest` on the terminal:

```bash
$ pytest
======================== test session starts ========================
platform darwin -- Python 3.9.12, pytest-7.1.2, pluggy-1.0.0
rootdir: /Users/particle1331/code/inefficient-networks/docs/notebooks/mlops/06-best-practices
plugins: anyio-3.6.1
collected 1 item

tests/model_test.py .                                         [100%]

========================= 1 passed in 1.03s =========================
```

Or using the UI in VS Code after selecting pytest in the configuration:

```{figure} ../../../img/vscode-testing.png
---
width: 40em
---
Testing interface in VS Code. Really convenient for running and visualizing tests.
```

Another thing we should always try is to deliberately **break** the tests. This makes sure that tests cover the changes being made. For example, we may forget adding `assert` statements which means the test are passed trivially.

Notice that unit tests act as invariants that must remain true even if particular implementation details around them changes. This is nice since the tests make sure that the important parts of the code are functioning even if we change or refactor things around it. And also allows fast iteration, we know that we are not making breaking changes to the code. In its extreme form, this practice is called [test-driven development](https://en.wikipedia.org/wiki/Test-driven_development) (TDD).

### Refactoring the lambda function

Note that we had to manually comment out things in our tests. This is not really great. Also, it would fail if our dev environment cannot connect to S3. A way to fix this is to create a special class which we can call `Model` containing all the logic of the original function, but with parts that are easier to test.


For this we modify `lambda_function.py` as follows:

```python
# lambda_function.py
import model
import os

# Load environmental variables
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'ride_predictions')
RUN_ID = os.getenv('RUN_ID')
TEST_RUN = os.getenv('TEST_RUN', 'False') == 'True'


model_service = model.init(
    predictions_stream_name=PREDICTIONS_STREAM_NAME,
    run_id=RUN_ID,
    test_run=TEST_RUN
)

def lambda_handler(event, context):
    return model_service.lambda_handler(event)
```

Here `predictions_stream_name` is specified as the stream where the function writes to. The stream where the function reads from need not be specified since this is configured in AWS Lambda. Then, we need to specify `run_id` to determine the model in S3 to use. Finally, `test_run` is simply a flag to indicate that we are in development mode (i.e. so we don't write on the output stream which may be already deployed during the development of this code). These are variables that are configured when the Docker container is run.

All of these variables determine a prediction service called `model_service` which abstracts away the process of predicting on an event. In particular, this means that we don't test directly on the actual `lambda_function` that is exposed by the container. (Although, we will see later that this is still covered with integration tests, since these make sure that everything is working together.) This is implemented in the following class:

```python
# model.py
import json
import boto3
import base64

import mlflow


def load_model(run_id: str):
    logged_model = f's3://mlflow-models-ron/1/{run_id}/artifacts/model'
    model =  mlflow.pyfunc.load_model(logged_model)
    return model


def base64_decode(encoded_data):
    decoded_data = base64.b64decode(encoded_data).decode('utf-8')
    ride_event = json.loads(decoded_data)
    return ride_event


class ModelService:

    def __init__(self, model, model_version):
        self.model = model
        self.model_version = model_version

    def prepare_features(self, ride):
        features = {}
        features['PU_DO'] = '%s_%s' % (ride['PULocationID'], ride['DOLocationID'])
        features['trip_distance'] = ride['trip_distance']
        return features

    def predict(self, features):
        pred = self.model.predict(features)
        return float(pred[0])


    def lambda_handler(self, event):
    
        predictions_events = []
        
        for record in event['Records']:
            encoded_data = record['kinesis']['data']
            ride_event = base64_decode(encoded_data)

            ride = ride_event['ride']
            ride_id = ride_event['ride_id']
        
            features = self.prepare_features(ride)
            prediction = self.predict(features)
        
            prediction_event = {
                'model': 'ride_duration_prediction_model',
                'version': self.model_version,
                'prediction': {
                    'ride_duration': prediction,
                    'ride_id': ride_id
                }
            }

            # if not TEST_RUN:
            #     kinesis_client = boto3.client('kinesis')
            #     kinesis_client.put_record(
            #         StreamName=PREDICTIONS_STREAM_NAME,
            #         Data=json.dumps(prediction_event),
            #         PartitionKey=str(ride_id)
            #     )
            
            predictions_events.append(prediction_event)


        return {
            'predictions': predictions_events
        }


def init(predictions_stream_name: str, run_id: str, test_run: bool):
    model = load_model(run_id)
    model_service = ModelService(model=model, model_version=run_id)
    return model_service
```

Note that the attributes of the class are informed by the methods that we collect inside of it. For example, we also version the models with `run_id`, hence the `model_version` attribute. This information is packaged along with the prediction. Also notice that the functionality for writing outputs is commented out. For now, we focus on getting correct predictions. 

### Further unit tests

Note that the unit tests should fail now after refactoring. Modifying the code so that it uses the `ModelService` class:

```python
# tests/model_test.py
...

def test_prepare_features():
    """Test preprocessing."""

    ride = {
        "PULocationID": 130,
        "DOLocationID": 205,
        "trip_distance": 3.66
    }

    model_service = model.ModelService(model=None, model_version=None)
    
    actual_features = model_service.prepare_features(ride)
    
    expected_features = {
        'PU_DO': '130_205',
        'trip_distance': 3.66,
    }

    assert actual_features == expected_features

...
```

Adding more tests on units of functionalities. For example, we can test the decoding base 64 inputs.

```python
# tests/model_test.py
...

def test_base64_decode():

    base64_input = "eyAgICAgICAgICAicmlkZSI6IHsgICAgICAgICAgICAgICJQVUxvY2F0aW9uSUQiOiAxMzAsICAgICAgICAgICAgICAiRE9Mb2NhdGlvbklEIjogMjA1LCAgICAgICAgICAgICAgInRyaXBfZGlzdGFuY2UiOiAzLjY2ICAgICAgICAgIH0sICAgICAgICAgICJyaWRlX2lkIjogMTIzICAgICAgfQ=="
    
    actual_result = model.base64_decode(base64_input)
    
    expected_result = {
        "ride": {
            "PULocationID": 130,
            "DOLocationID": 205,
            "trip_distance": 3.66
        }, 
        "ride_id": 123
    }

    assert actual_result == expected_result

...
```

For now we are setting `model=None` since this functionality does not use any model. Now, we want to test predict so we want to have a model to test with. We don't really want to connect to S3 in our dev environment, i.e. we don't want to bother with things like credentials, or delays when downloading the model. Or the S3 bucket might not be up yet. So we will create a **mock model** which mimics all relevant attributes and methods of real models for prediction.

```python
# tests/model_test.py
...

class ModelMock:
    def __init__(self, value):
        self.value = value 

    def predict(self, X):
        n = len(X)
        return [self.value] * n


def test_predict():
    
    features = {
        'PU_DO': '130_205',
        'trip_distance': 3.66,
    }

    model_mock = ModelMock(value=10.0)
    model_service = model.ModelService(model=model_mock, model_version=None)

    actual_result = model_service.predict(features)
    expected_result = 10.0

    assert actual_result == expected_result
```

Finally, we would like to test the `ModelService.lambda_handler` function:

```python
# tests/model_test.py
...

def test_lambda_handler():
    
    event = {
        "Records": [
            {
                "kinesis": {
                    "data": "eyAgICAgICAgICAicmlkZSI6IHsgICAgICAgICAgICAgICJQVUxvY2F0aW9uSUQiOiAxMzAsICAgICAgICAgICAgICAiRE9Mb2NhdGlvbklEIjogMjA1LCAgICAgICAgICAgICAgInRyaXBfZGlzdGFuY2UiOiAzLjY2ICAgICAgICAgIH0sICAgICAgICAgICJyaWRlX2lkIjogMTIzICAgICAgfQ==",
                },
            }
        ]
    }

    model_mock = ModelMock(value=10.0)
    model_service = model.ModelService(model=model_mock, model_version="Test123")

    actual_result = model_service.lambda_handler(event)
    expected_result = {
        'predictions': [
            {
                'model': 'ride_duration_prediction_model', 
                'version': 'Test123', 
                'prediction': {
                    'ride_duration': 10.0,
                    'ride_id': 123
                }
            }
        ]
    }

    assert actual_result == expected_result
```

### Adding callbacks

We covered pretty much everything except the writing on the output stream. Note that this part of the `lambda_handler` code seems out of place. All other functionalities are geared towards prediction and now, if we include writing on an output stream, the model service has to know about the Kinesis client, the stream name, etc.

Instead, what we can do is define a separate unit that handles what happens **after** the prediction is done, i.e. a **callback** on prediction events. This is called every time the model service completes a prediction. Putting something on the Kinesis stream would be one of the callbacks.

```python
# model.py
...

class ModelService:

    def __init__(self, model, model_version, callbacks=None):
        self.model = model
        self.model_version = model_version
        self.callbacks = callbacks or []

    ...

    def lambda_handler(self, event):
    
        predictions_events = []
        
        for record in event['Records']:
            
            ...
        
            prediction_event = {
                'model': 'ride_duration_prediction_model',
                'version': self.model_version,
                'prediction': {
                    'ride_duration': prediction,
                    'ride_id': ride_id
                }
            }

            for callback in self.callbacks:     # !
                callback(prediction_event)
            
            predictions_events.append(prediction_event)

        return {
            'predictions': predictions_events
        }

...
```

Notice that callbacks act on prediction events, e.g. writes them to Kinesis streams. Below we modify the `init` function to include the Kinesis callback which we package into a class since we need a callable. Here we simply pass a reference to the `put_record` method which acts on prediction events in the `callbacks` list.

```python
# model.py
...

class KinesisCallback:
    
    def __init__(self, kinesis_client, predictions_stream_name):
        self.kinesis_client = kinesis_client
        self.predictions_stream_name = predictions_stream_name

    def put_record(self, prediction_event):
        ride_id = prediction_event['prediction']['ride_id']
        self.kinesis_client.put_record(
            StreamName=self.predictions_stream_name,
            Data=json.dumps(prediction_event),
            PartitionKey=str(ride_id)
        )


def init(predictions_stream_name: str, run_id: str, test_run: bool):
    """Initialize model_service for lambda_function module."""

    model = load_model(run_id)
    callbacks = []
    
    if not test_run:
        kinesis_client = boto3.client('kinesis')
        kinesis_callback = KinesisCallback(kinesis_client, predictions_stream_name)
        callbacks.append(kinesis_callback.put_record)

    model_service = ModelService(model=model, model_version=run_id, callbacks=callbacks)
    return model_service
```

This looks really really nice. We will test this later with a local cloud setup! 

### Appendix: Source code

See here for the [finished code](https://github.com/particle1331/inefficient-networks/tree/2a5cad64632e098cc305bfe0dfb8bd1242135939/docs/notebooks/mlops/06-best-practices/code) for this section. Final directory structure at the end should look like:

```text
.
├── Dockerfile
├── Pipfile
├── Pipfile.lock
├── lambda_function.py
├── model.py
└── tests
    ├── __init__.py
    └── model_test.py
```

## Integration tests

In the last section, we refactored our original code for our lambda function and created tests. But these tests are quite limited. They only test only functions. They don't test that the entire thing still works. Recall that we have `test_docker.py` which sort of checks that predictions still work for the container as a whole. At this point, we can build and run the container to make sure that this code still works as a whole. 

**Remark.** Note that in practice the following test should be done along with writing the unit tests, or after refactoring, so that no drastic change is done without making sure everything still are integrated well. But here we separate the two processes for the sake of presentation.

Dockerfile:

```Dockerfile
FROM public.ecr.aws/lambda/python:3.9

RUN pip install -U pip
RUN pip install pipenv

COPY [ "Pipfile", "Pipfile.lock", "./" ]

RUN pipenv install --system --deploy

COPY [ "lambda_function.py", "model.py", "./" ]

CMD [ "lambda_function.lambda_handler" ]
```

Building:

```bash
docker build -t stream-model-duration:v2 .
```

```
[+] Building 1.2s (11/11) FINISHED
 => [internal] load build definition from Dockerfile            0.0s
 => => transferring dockerfile: 37B                             0.0s
 => [internal] load .dockerignore                               0.0s
 => => transferring context: 2B                                 0.0s
 => [internal] load metadata for public.ecr.aws/lambda/python:  1.0s
 => [1/6] FROM public.ecr.aws/lambda/python:3.9@sha256:3dda276  0.0s
 => [internal] load build context                               0.0s
 => => transferring context: 2.64kB                             0.0s
 => CACHED [2/6] RUN pip install -U pip                         0.0s
 => CACHED [3/6] RUN pip install pipenv                         0.0s
 => CACHED [4/6] COPY [ Pipfile, Pipfile.lock, ./ ]             0.0s
 => CACHED [5/6] RUN pipenv install --system --deploy           0.0s
 => [6/6] COPY [ lambda_function.py, model.py, ./ ]             0.0s
 => exporting to image                                          0.0s
 => => exporting layers                                         0.0s
 => => writing image sha256:26e91ec8b6a57ba80437dc8ee278c92dfd  0.0s
 => => naming to docker.io/library/stream-model-duration:v2     0.0s
```

Running:

```bash
docker run -it --rm -p 8080:8080 --env-file .env stream-model-duration:v2
```

```
24 Jul 2022 02:31:10,656 [INFO] (rapid) exec '/var/runtime/bootstrap' (cwd=/var/task, handler=)
24 Jul 2022 02:31:14,705 [INFO] (rapid) extensionsDisabledByLayer(/opt/disable-extensions-jwigqn8j) -> stat /opt/disable-extensions-jwigqn8j: no such file or directory
24 Jul 2022 02:31:14,707 [WARNING] (rapid) Cannot list external agents error=open /opt/extensions: no such file or directory
START RequestId: aa21ad77-9ae1-49bc-a7d2-a7bfa3e42266 Version: $LATEST
```

Note that this uses a `.env` file for defining environmental variables:

```bash
# .env
RUN_ID=f4e2242a53a3410d89c061d1958ae70a
TEST_RUN=True
PREDICTIONS_STREAM_NAME=ride_predictions

AWS_ACCESS_KEY_ID=AKIATQLTRYVSVQ6EWDLI
AWS_SECRET_ACCESS_KEY=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
AWS_DEFAULT_REGION=us-east-1
```

Testing prediction using the following script:

```python
# test_docker.py
import requests

event = {
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49630706038424016596026506533782471779140474214180454402",
                "data": "eyAgICAgICAgICAicmlkZSI6IHsgICAgICAgICAgICAgICJQVUxvY2F0aW9uSUQiOiAxMzAsICAgICAgICAgICAgICAiRE9Mb2NhdGlvbklEIjogMjA1LCAgICAgICAgICAgICAgInRyaXBfZGlzdGFuY2UiOiAzLjY2ICAgICAgICAgIH0sICAgICAgICAgICJyaWRlX2lkIjogMTIzICAgICAgfQ==",
                "approximateArrivalTimestamp": 1655944485.718
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49630706038424016596026506533782471779140474214180454402",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::241297376613:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:241297376613:stream/ride_events"
        }
    ]
}


if __name__ == "__main__":    
    url = 'http://localhost:8080/2015-03-31/functions/function/invocations'
    response = requests.post(url, json=event)
    print(response.json())
```

Running `python test_docker.py` on the terminal. Looks like the lambda function is still working inside the container. That is, it can do download the model from S3, decode the input data, and make a prediction that is exposed in the lambda container. 

```bash
$ pipenv run python test_docker.py
{
    'predictions': [
        {
            'model': 'ride_duration_prediction_model', 
            'version': 'f4e2242a53a3410d89c061d1958ae70a', 
            'prediction': {
                'ride_duration': 18.210770674183355, 
                'ride_id': 123
            }
        }
    ]
}
```

Note that this isn't yet a proper test, i.e. we just printed something and checked the print if it looked correct. In the next section, we change this into something that can be automated with `pytest`. 

### Deepdiff

To better process differences in JSON outputs, we install a dev dependency called [DeepDiff](https://deepdiff.readthedocs.io/en/latest/). 

```bash
pipenv install --dev deepdiff
```