# Model Deployment

![Status](https://img.shields.io/static/v1.svg?label=Status&message=Ongoing&color=orange)

<!-- Place this tag where you want the button to render. -->
<a class="github-button" href="https://github.com/particle1331/steepest-ascent" data-color-scheme="no-preference: dark; light: light; dark: dark;" data-icon="octicon-star" data-size="large" data-show-count="true" aria-label="Star particle1331/steepest-ascent on GitHub">Star</a>
<!-- Place this tag in your head or just before your close body tag. -->
<script async defer src="https://buttons.github.io/buttons.js"></script> 


In this module, we will look into deploying the ride duration model which has been our working example in the modules. Deploying means that other applications can get predictions from our model. We will look at three modes of deployment: **online** deployment, **offline** or batch deployment, and **streaming**. 

In online mode, our service must be up all the time. To do this, we implement a web service which takes in HTTP requests and sends out predictions. In offline or mode, we have a service running regularly, but not necessarily all the time. This can make predictions for a batch of examples that runs periodically using workflow orchestration. Finally, we look at how to implement a streaming service, i.e. a machine learning service that listens to a stream of events and reacts to it using AWS Kinesis and AWS Lambda.

```{margin}
⚠️ **Attribution:** These are notes for [Module 4: Model Deployment](https://github.com/DataTalksClub/mlops-zoomcamp/blob/main/04-deployment) of the [MLOps Zoomcamp](https://github.com/DataTalksClub/mlops-zoomcamp). The MLOps Zoomcamp is a free course from [DataTalks.Club](https://github.com/DataTalksClub).
```


## Deploying models with Flask and Docker

In this section, we develop a web server using Flask for serving model predictions. The model is obtained from an S3 artifacts store and predicts on data sent to the service by the backend. We will containerize the application using Docker. This container can be deployed anywhere where Docker is supported such as Kubernetes and Elastic Beanstalk.

### Packaging modeling code

We will collect code around our models into a [`ride_duration`](https://pypi.org/project/ride-duration-prediction/) package that will be uploaded to PyPI (hence can be installed using `pip`). Having a model package ensures smooth integration with the Flask API since we have the same code for training, testing, and inference. Ideally, we should also use the same environment for each of these phases and this can be done by using [Pipenv](https://pipenv.pypa.io/en/latest/). 

Also, packaging makes imports just work. For consistency, we will also use this package for our batch scoring workflow. The directory structure of our project would look like the following. Notice the nice separation between model code, application code, and deployment code.

```
deployment/
├── app/
│   └── main.py
├── ride_duration/
│   ├── __init__.py
│   ├── predict.py
│   ├── utils.py
│   └── VERSION
├── .env
├── Dockerfile
├── Pipfile
├── MANIFEST.in
├── Pipfile.lock
├── setup.py
├── test.py
├── train.py
└── pyproject.toml
```

First we create `setup.py` and `pyproject.toml` for packaging. For the `setup` module, we only have to change the package metadata and perhaps the license and [Trove classifiers](https://pypi.org/classifiers/). Note that most of these can be left blank, except perhaps the package name. Also, `install_requires=[]` is currently empty, but will be populated later using `pipenv-sync`.

```{margin}
[`setup.py`](https://github.com/particle1331/inefficient-networks/blob/12c6bb13254c4984aa8cad3bb36b2860ce2d1187/docs/notebooks/mlops/04-deployment/setup.py)
```
```python
from pathlib import Path
from setuptools import find_packages, setup


# Package meta-data.
NAME = "ride-duration-prediction"
DESCRIPTION = "Predicting ride duration for TLC Trip Record Data."
URL = "https://particle1331.github.io/inefficient-networks/notebooks/mlops/04-deployment/notes.html"
EMAIL = "particle1331@gmail.com"
AUTHOR = "Ron Medina"
REQUIRES_PYTHON = ">=3.9.0"


# The rest you shouldn't have to touch too much. :)
# Except perhaps the License and Trove Classifiers
# ------------------------------------------------

...

# Where the magic happens:
setup(
    ...
    install_requires=[],
    ...
    license="MIT",
    classifiers=[
        # Trove classifiers
        # Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3.9",
    ],
)
```

The other one just specifies the build system to use:

```{margin}
[`pyproject.toml`](https://github.com/particle1331/inefficient-networks/blob/12c6bb13254c4984aa8cad3bb36b2860ce2d1187/docs/notebooks/mlops/04-deployment/pyproject.toml)
```
```python
[build-system]
requires = ["setuptools>=42.0", "wheel"]
build-backend = "setuptools.build_meta"
```

Additionally we include [`MANIFEST.in`](https://github.com/particle1331/inefficient-networks/blob/217134c84bb323452bf0dc3e8b6a6a04fea8f06b/docs/notebooks/mlops/04-deployment/MANIFEST.in) file to specify the files included in the source distribution of the package. The full list can be viewed in the `SOURCES.txt` file of the generated `egg-info` folder after package build.

```{margin}
[`MANIFEST.in`](https://github.com/particle1331/inefficient-networks/blob/12c6bb13254c4984aa8cad3bb36b2860ce2d1187/docs/notebooks/mlops/04-deployment/MANIFEST.in)
```
```
include ride_duration/*.py
include ride_duration/VERSION

recursive-exclude * __pycache__
recursive-exclude * *.py[co]
```

### Setting up the environment with Pipenv

We will use [Pipenv](https://pipenv.pypa.io/en/latest/) to manage our projects package dependencies. To create a Pipenv environment, we have to specify the path to the Python interpreter that it will use. Here we get the path to a Python interpreter of a desired version installed using `conda`:

```bash
(base) ubuntu@ip-172-31-24-183:~$ conda activate py39
(py39) ubuntu@ip-172-31-24-183:~$ which python
/home/ubuntu/anaconda3/envs/py39/bin/python
```

Close this terminal and open a new one. Navigate to the project root folder and create a new Pipenv environment as follows. Here we also install the dependencies.

```bash
pipenv shell --python=/home/ubuntu/anaconda3/envs/py39/bin/python
pipenv install scikit-learn==1.0.2 flask pandas mlflow boto3 
```

Notice that we get `Pipfile` which supersedes the usual requirements file and also a `Pipfile.lock` containing hashes of downloaded packages that ensure reproducible builds.  Note that `pipenv shell` activates the environment where these files are located. Next we install the model package locally and other packages in development mode:

```bash
pipenv install --dev -e .
pipenv install --dev requests
pipenv install --dev pipenv-setup
```

To make this available as a kernel in Jupyter notebook or VS Code:

```bash
pipenv install --dev jupyter notebook
pipenv run python -m ipykernel install --user --name=`basename $VIRTUAL_ENV`
```

Our `Pipfile` should now look as follows. Note that `ride-duration-prediction` is installed in editable mode since the package code is still in development. This was only installed so that imports work everywhere.

```{margin}
[`Pipfile`](https://github.com/particle1331/inefficient-networks/blob/383314b4c5e01fe9cc9d65b9ce1b9b90abb04001/docs/notebooks/mlops/04-deployment/Pipfile)
```
```ini
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
scikit-learn = "==1.0.2"
flask = "*"
pandas = "*"
mlflow = "*"
boto3 = "*"

[dev-packages]
ride-duration-prediction = {editable = true, path = "."}
jupyter = "*"
notebook = "*"
requests = "*"
pipenv-setup = "*"

[requires]
python_version = "3.9"
```

AWS credentials and other environmental variables that we will use later are saved in a `.env` file in the same directory as Pipfile. These are automatically detected and loaded by Pipenv when calling `pipenv shell`. However, the shell must be restarted whenever the `.env` file is modified.

```bash
# .env
EXPERIMENT_ID=1
RUN_ID=f4e2242a53a3410d89c061d1958ae70a
AWS_ACCESS_KEY_ID=A*************LI
AWS_SECRET_ACCESS_KEY=N*********************+9
```

### Package modules

In the following module, define helper functions for model training and inference. This includes the usual `load_training_dataframe` function which creates the target features (ride duration in minutes) and filters it to some range, i.e. `[1, 60]`. This function is used for creating training and validation datasets. The other function `prepare_features` is used for feature engineering. 

```{margin}
[`utils.py`](https://github.com/particle1331/inefficient-networks/blob/383314b4c5e01fe9cc9d65b9ce1b9b90abb04001/docs/notebooks/mlops/04-deployment/ride_duration/utils.py#L16-L37)
```
```python
def load_training_dataframe(file_path, y_min=1, y_max=60):
    """Load data from disk and preprocess for training."""
    
    # Load data from disk
    data = pd.read_parquet(file_path)

    # Create target column and filter outliers
    data['duration'] = data.lpep_dropoff_datetime - data.lpep_pickup_datetime
    data['duration'] = data.duration.dt.total_seconds() / 60
    data = data[(data.duration >= y_min) & (data.duration <= y_max)]

    return data


def prepare_features(input_data: Union[list[dict], pd.DataFrame]):
    """Prepare features for dict vectorizer."""

    X = pd.DataFrame(input_data)
    X['PU_DO'] = X['PULocationID'].astype(str) + '_' + X['DOLocationID'].astype(str)
    X = X[['PU_DO', 'trip_distance']].to_dict(orient='records')
    
    return X
```

Next we look at the `predict` module of our package. This contains two functions: one for loading the model and another for making predictions with the model. The `load_model` function loads a model directly from the S3 artifacts store:

```{margin}
[`predict.py`](https://github.com/particle1331/inefficient-networks/blob/383314b4c5e01fe9cc9d65b9ce1b9b90abb04001/docs/notebooks/mlops/04-deployment/ride_duration/predict.py#L10-L16)
```
```python
def load_model(experiment_id, run_id):
    """Get model from our S3 artifacts store."""

    source = f"s3://mlflow-models-ron/{experiment_id}/{run_id}/artifacts/model"
    model = mlflow.pyfunc.load_model(source)

    return model
```

To avoid having to load the preprocessor separately from the artifacts store, we train models that are pipelines of the following form (see **Appendix** below):

```python
pipeline = make_pipeline(
    DictVectorizer(), 
    RandomForestRegressor(**params, n_jobs=-1)
)
```

Our models expect as input `prepare_features(data)` where `data` can be a `DataFrame` with rows containing ride or a list of ride features dictionaries (e.g obtained as a JSON payload). So we define the following function to help with inference:

```{margin}
[`predict.py`](https://github.com/particle1331/inefficient-networks/blob/383314b4c5e01fe9cc9d65b9ce1b9b90abb04001/docs/notebooks/mlops/04-deployment/ride_duration/predict.py#L19-L25)
```
```python
def make_prediction(model, input_data: Union[list[dict], pd.DataFrame]):
    """Make prediction from features dict or DataFrame."""
    
    X = prepare_features(input_data)
    preds = model.predict(X)

    return preds
```

Testing out the `load_model()` function:

```bash
❯ python
Python 3.9.13 | packaged by conda-forge | (main, May 27 2022, 17:01:00)
[Clang 13.0.1 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> from ride_duration.predict import load_model
>>> model = load_model(os.getenv("EXPERIMENT_ID"), os.getenv("RUN_ID"))
>>> model
mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.sklearn
  run_id: f4e2242a53a3410d89c061d1958ae70a
```

<br>

```{figure} ../../../img/s3-artifacts-ss.png
---
---
Artifacts store for model runs of experiment 1.
```

As an alternative, we can also load the latest **production version** directly from the model registry (no need to specify a run and experiment ID) using the tracking server. One issue is that starting of the Flask server can fail whenever the request the tracking server is down.

```python
TRACKING_URI = f"http://{TRACKING_SERVER_HOST}:5000"

# Fetch production model from client
mlflow.set_tracking_uri(TRACKING_URI)
client = MlflowClient(tracking_uri=TRACKING_URI)
prod_model = client.get_latest_versions(name='NYCRideDurationModel', stages=['Production'])[0]

# Load model from S3 artifacts store
run_id = prod_model.run_id
source = prod_model.source
model = mlflow.pyfunc.load_model(source)
```

### Build and upload

Once we are satisfied with the model package, we can now upload it to PyPI. First we use `pipenv-setup sync` to update `install_requires` in the `setup` script with packages installed using Pipenv. This makes sure there are no dependency conflicts when using the package.

```bash
$ pipenv-setup sync

setup.py was successfully updated
51 default packages from Pipfile.lock synced to setup.py
```

Building the package and uploading to PyPI:

```bash
pipenv install --dev build
pipenv install --dev twine
pipenv run python -m build
export USERNAME=particle1331
export PASSWORD=************
pipenv run python -m twine upload -u $USERNAME -p $PASSWORD dist/*
```

Uploaded package can be viewed [here](https://pypi.org/project/ride-duration-prediction/0.1.0/). Properly installing the package as a dependency:

```bash
pipenv uninstall --dev -e .
pipenv install ride-duration-prediction==0.1.0
```

<br>

```{figure} ../../../img/pypi.png
---
width: 40em
---
Our model package in the Python package index. 🐍
```

### Serving predictions using Flask

The model loads when the Flask server starts. The server exposes a single endpoint defined by `predict_endpoint` which expects a singleton JSON payload of ride features from the backend. For observability, each prediction is returned along with the `run_id` of the model.

```{margin}
[`app/main.py`](https://github.com/particle1331/inefficient-networks/blob/d61298717898ebdc43dc35e45845f3acd49ba607/docs/notebooks/mlops/04-deployment/app/main.py)
```
```python
import os
from ride_duration.predict import load_model, make_prediction
from flask import Flask, request, jsonify


# Load model with run ID and experiment ID defined in the env.
RUN_ID = os.getenv("RUN_ID")
EXPERIMENT_ID = os.getenv("EXPERIMENT_ID")
model = load_model(run_id=RUN_ID, experiment_id=EXPERIMENT_ID)

app = Flask('duration-prediction')


@app.route('/predict', methods=['POST'])
def predict_endpoint():
    """Predict duration of a single ride using NYCRideDurationModel."""
    
    ride = request.get_json()
    preds = make_prediction(model, [ride])

    return jsonify({
        'duration': float(preds[0]),
        'model_version': RUN_ID,
    })


if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=9696)
```

For testing the prediction endpoint, we have the following script. This can be used without modification to test remote hosts using port forwarding.

```{margin}
[`test.py`](https://github.com/particle1331/inefficient-networks/blob/d61298717898ebdc43dc35e45845f3acd49ba607/docs/notebooks/mlops/04-deployment/test.py)
```
```python
import json
import requests


ride = {
    "PULocationID": 130,
    "DOLocationID": 205,
    "trip_distance": 3.66,
}


if __name__ == "__main__":
    
    host = "http://0.0.0.0:9696"
    url = f"{host}/predict"
    response = requests.post(url, json=ride)
    result = response.json()
    
    print(result)
```

### Containerizing the application with Docker

For our `Dockerfile`, we start by installing Pipenv. Then, we copy all files relevant for running the application. So these are the requirements files, the model package files, and the files for the Flask app. Next, we install everything using Pipenv, expose the `9696` endpoint, and configure the entrypoint which serves the main app on `0.0.0.0:9696`.

```{margin}
[`Dockerfile`](https://github.com/particle1331/inefficient-networks/blob/d61298717898ebdc43dc35e45845f3acd49ba607/docs/notebooks/mlops/04-deployment/Dockerfile)
```
```Dockerfile
FROM python:3.9.13-slim

RUN pip install -U pip
RUN pip install pipenv

WORKDIR /app

COPY [ "Pipfile", "Pipfile.lock",  "./"]
COPY [ "app",  "./app"]

RUN pipenv install --system --deploy

EXPOSE 9696

# https://stackoverflow.com/a/71092624/1091950
ENTRYPOINT [ "gunicorn", "--bind=0.0.0.0:9696", "--timeout=600", "app.main:app" ]
```

Building the image:

```bash
docker build -t ride-duration-prediction-service:v1 .
```
```bash
[+] Building 177.1s (12/12) FINISHED
 => [internal] load build definition from Docke  0.1s
 => => transferring dockerfile: 376B             0.0s
 => [internal] load .dockerignore                0.1s
 => => transferring context: 2B                  0.0s
 => [internal] load metadata for docker.io/libr  4.9s
 => [1/7] FROM docker.io/library/python:3.9.13  22.0s
 => => resolve docker.io/library/python:3.9.13-  0.0s
 => => sha256:278d211701f68 858.90kB / 858.90kB  3.1s
 => => sha256:94b65918aca5f4b 11.59MB / 11.59MB  7.6s
 => => sha256:c01a2db78654c1923 1.86kB / 1.86kB  0.0s
 => => sha256:ef78109875ac0ac47 1.37kB / 1.37kB  0.0s
 => => sha256:ddaaa6b80b0541f48 7.50kB / 7.50kB  0.0s
 => => sha256:3b157c852f2736 30.07MB / 30.07MB  15.8s
 => => sha256:0f660d3f04731206fb1e8 234B / 234B  3.9s0
 => => sha256:2b5c279a2d3443d3a 2.95MB / 2.95MB  8.6s0
 => => extracting sha256:3b157c852f2736e12f0904  3.4s0
 => => extracting sha256:278d211701f6882403e47f  0.3s0
 => => extracting sha256:94b65918aca5f4bc0654e6  0.9s0
 => => extracting sha256:0f660d3f04731206fb1e8b  0.0s
 => => extracting sha256:2b5c279a2d3443d3ab6534  0.5s
 => [internal] load build context                0.1s
 => => transferring context: 96.05kB             0.0s
 => [2/7] RUN pip install -U pip                 5.7s
 => [3/7] RUN pip install pipenv                14.1s
 => [4/7] WORKDIR /app                           0.1s
 => [5/7] COPY [ Pipfile, Pipfile.lock,  ./]     0.0s
 => [6/7] COPY [ app,  ./app]                    0.0s
 => [7/7] RUN pipenv install --system --deplo  106.1s
 => exporting to image                          23.7s
 => => exporting layers                         23.4s
 => => writing image sha256:f92279e7eb9164ba32e  0.0s
 => => naming to docker.io/library/ride-duratio  0.0s
```

Running the container:

```bash
docker run --env-file .env -it --rm -p 9696:9696 ride-duration-prediction-service:v1
```
```bash
[2022-06-25 23:51:36 +0000] [1] [INFO] Starting gunicorn 20.1.0
[2022-06-25 23:51:36 +0000] [1] [INFO] Listening at: http://0.0.0.0:9696 (1)
[2022-06-25 23:51:36 +0000] [1] [INFO] Using worker: sync
[2022-06-25 23:51:36 +0000] [7] [INFO] Booting worker with pid: 7
2022/06/25 23:52:04 WARNING mlflow.pyfunc: Detected one or more mismatches between the model's dependencies and the current Python environment:
 - psutil (current: uninstalled, required: psutil==5.9.1)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
```

Note that `--env-file .env` loads the environmental variables into the container. From the logs, we see that there is a mismatch between the training environment and the current one. In another terminal, let us test if prediction works:

```bash
❯ python test.py
{'duration': 18.210770674183355, 'model_version': 'f4e2242a53a3410d89c061d1958ae70a'}
```

After the initial loading time, the next predictions are returned instantaneously. This confirms that the model is loaded only once when the server starts. Or we can log model loading to be really sure.

## Streaming: Deploying models with Kinesis and Lambda

A streaming service consists of **producers** and **consumers**. Producers push events to the event stream which are consumed by consuming services that react to this stream. Recall that a web service exhibits a 1-1 relationship so that there is explicit connection between user and service. On the other hand, the relationship between producing and consuming services can be 1-many or many-many. There is only implicit connection since we don't know which consumers will react or how many. Streaming services can be scaled to many services or models.

For example, when a user uses our ride hailing app, the backend can send an event to the stream containing all information about this ride. Then, services will react on this event, e.g. one consuming service predicts tip and sends a push notification to user asking for the tip. And consuming services which makes better ride duration prediction but takes more time to make a prediction can update the prediction that was initially given to the user by the online web service. 

### Creating an IAM Role

To create a serverless function for serving our model, we will use [AWS Lambda](https://aws.amazon.com/lambda/). The advantage of this is that we do not have to worry about owning a server that runs our function, we just know that the function is being executed somewhere in AWS. For the sake of demonstration, we will pretend that we are serving better model predictions, although we are actually deploying the same Random Forest model.

**Steps**

1. Open the roles page in the IAM console and choose **Create role**. Choose Lambda as trusted entity type.
    ```{figure} ../../../img/create-role.png
    ---
    ---
    ```

2. Add `AWSLambdaKinesisExecutionRole` to permissions. This role can read from Kinesis streams and write logs. Resource `"*"` means that the role can do this to *any* Kinesis stream or log group. So this takes care of reading from input streams for our functions.
    ```{figure} ../../../img/create-role-2.png
    ---
    ---
    ```

3. Set name to `lambda-kinesis-role` and proceed to create role.
    ```{figure} ../../../img/create-role-3.png
    ---
    ---
    ```

4. Next we will give read permissions to S3. This is because our Lambda function will get its machine learning model from the MLflow artifacts store in S3. Select the `lambda-kinesis-role`, then select **Add permissions** > **Attach policies** > **Create policy**. Switch to the editor's JSON tab and paste:
    ```json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "s3:Get*",
                    "s3:List*"
                ],
                "Resource": [
                    "arn:aws:s3:::mlflow-models-ron",
                    "arn:aws:s3:::mlflow-models-ron/*"
                ]
            }
        ]
    }
    ```
    Here `mlflow-models-ron` is the name of the artifacts store in S3. From this JSON, we can read that this policy allows all `Get` and all `List` actions on our `mlflow-models-ron` S3 bucket and its subdirectories. Skip tags, set name to be `read_premission_mlflow-models-ron`, put in the appropriate description, then select **Create policy**. Make sure to attach this policy to `lambda-kinesis-role`.
    <br> <br>

5. Finally, our Lambda functions have to write to output data streams. Do the same as above and create and attach a policy to `lambda-kinesis-role`. In this case, paste the following JSON, and name the policy `lambda_kinesis_write_to_ride_predictions`.
    ```json
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kinesis:PutRecord",
                    "kinesis:PutRecords"
                ],
                "Resource": "arn:aws:kinesis:us-east-1:241297376613:stream/ride_predictions"
            }
        ]
    }
    ```

    This JSON is for writing on the Kinesis data stream `ride_predictions` which doesn't exist yet. So you have to write the correct name later on. Here `us-east-1` is the region and `241297376613` is the Account ID. Note that all streams and functions must live on the same region for them to work together. The allowed actions are `PutRecord` for an API call that puts a single record on the stream, and `PutRecords` for an API call that puts a batch of records on the stream.

<br>

```{figure} ../../../img/lambda-kinesis-policies.png
---
---
The `lambda-kinesis-role` for our Lambda function with policies for reading from all Kinesis data streams, writing on `ride_predictions` Kinesis data stream, and getting models from the `mlflow-models-ron` bucket in S3. 
```


### Creating a Lambda function

Go to Lambda and create `ride-duration-prediction-test` for testing. Later we will create the actual function. In the permissions, choose the role that we have just created.

```{figure} ../../../img/create-function.png
---
---
```

```{figure} ../../../img/create-function-2.png
---
---
The `ride-duration-prediction-test` function.
```

Let's modify this to look more like our machine learning function. Note that unlike for a web service, there is no 1-1 relationship between inputs and outputs. We have to include a `ride_id` to tie an input event to its corresponding event in the output stream. In the code section write and deploy:

```python
# lambda_function.py
import json

def prepare_features(ride):
    features = {}
    features['PU_DO'] = f"{ride['PULocationID']}_{ride['DOLocationID']}"
    features['trip_distance'] = ride['trip_distance']
    return features

def predict(features):
    return 10.0

def lambda_handler(event, context):
    ride = event['ride']
    ride_id = event['ride_id']

    features = prepare_features(ride)
    prediction = predict(features)

    return {
        'ride_duration': prediction,
        'ride_id': ride_id
    }
```

Configure the **test event** to:

```{margin}
**Toy test event**
```
```JSON
{
    "ride": {
        "PULocationID": 130,
        "DOLocationID": 205,
        "trip_distance": 3.66
    }, 
    "ride_id": 123
}
```

Running the test. The model predicts a constant `10` minutes for each ride:

```{figure} ../../../img/lambda-2.png
---
---
```

### Reading input a Kinesis data stream

In this section, we attach a Kinesis data stream to our function. Go to Kinesis to create a data stream. We will call it `ride_events` and set its capacity mode to **Provisioned** with 1 shard. AWS provides the write and read capacity for the chosen number of shards. Note that we have to pay per hour for each shard.

```{figure} ../../../img/kinesis-stream-1.png
---
width: 30em
---
```

Going back to Lambda, add `ride_events` as trigger to the function:

```{figure} ../../../img/kinesis-stream-2.png
---
width: 30em
---
```


Once the trigger is enabled, we execute the following the command line which puts a test event to the stream. Our lambda should read from the stream and we should see this in logs. We comment out our code, and only print out what a Kinesis event looks like:

```python
# lambda_function.py
import json

def prepare_features(ride):
    features = {}
    features['PU_DO'] = f"{ride['PULocationID']}_{ride['DOLocationID']}"
    features['trip_distance'] = ride['trip_distance']
    return features

def predict(features):
    return 10.0

def lambda_handler(event, context):  
    event_json = json.dumps(event)
    print(event_json)

    # ride = event['ride']
    # ride_id = event['ride_id']

    # features = prepare_features(ride)
    # prediction = predict(features)

    # return {
    #     'ride_duration': prediction,
    #     'ride_id': ride_id
    # }
```

Putting the above test event into the input stream:

```{margin}
**Put record**
<br>
v. `aws-cli/1.22.34`
```
```bash
aws kinesis put-record \
    --stream-name ride_events \
    --partition-key 1 \
    --data '{
        "ride": {
            "PULocationID": 130,
            "DOLocationID": 205,
            "trip_distance": 3.66
        },
        "ride_id": 123
    }'
```
```{margin}
`[out]`
```
```bash
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49630706038424016596026506533783680704960320015674900482"
}
```

Note the `SequenceNumber` of this event. This event can be found on the logs by looking at the "Monitor" tab of the lambda function and clicking on "View logs in CloudWatch". We update the test event in the lambda function with this record. In particular, this means we don't have to push events to the input stream when testing.

```{margin}
**Actual test event**
```

```json
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49630706038424016596026506533782471779140474214180454402",
                "data": "eyAgICAgICAgICAicmlkZSI6IHsgICAgICAgICAgICAgICJQVUxvY2F0aW9uSUQiOiAxMzAsICAgICAgICAgICAgICAiRE9Mb2NhdGlvbklEIjogMjA1LCAgICAgICAgICAgICAgInRyaXBfZGlzdGFuY2UiOiAzLjY2ICAgICAgICAgIH0sICAgICAgICAgICJyaWRlX2lkIjogMTIzICAgICAgfQ==",
                "approximateArrivalTimestamp": 1655944485.718
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000000:49630706038424016596026506533782471779140474214180454402",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::241297376613:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:241297376613:stream/ride_events"
        }
    ]
}
```

<br>

```{figure} ../../../img/kinesis-stream-5.png
---
---
Record with sequence number `496...402` printed in the logs.
```

It turns out that Kinesis encodes event data in `base64`, i.e. as `"data": "eyAgI..."`. So we have to decode this to be able to use it. Since the batch size is set to 100, we need to iterate over records to access each of them. Our function is modified as follows:

```python
# lambda_function.py
...

def lambda_handler(event, context):
    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        decoded_data = base64.b64decode(encoded_data).decode('utf-8')
        ride_event = json.loads(decoded_data)

        ride = ride_event['ride']
        ride_id = ride_event['ride_id']
    
        features = prepare_features(ride)
        prediction = predict(features)
    
        return {
            'ride_duration': prediction,
            'ride_id': ride_id
        }
```

Executing on the Kinesis test event. Observe that the data has been properly decoded:

```{figure} ../../../img/kinesis-stream-6.png
---
---
```

### Writing predictions to a Kinesis data stream

In the previous section, we created an input data stream in Kinesis. Repeat the same process in Kinesis to create an output data stream called `ride_predictions`. Similarly set its capacity mode to **Provisioned** with 1 shard. Note that we already created write permission to this stream even before it was created. But you have to get the name correct.

To write onto an `ride_predictions`, we modify our lambda function as follows. Basically, we have a client version of the CLI command for putting records onto a data stream. So we have to connect to the Kinesis client using `boto3` and specify `PREDICTIONS_STREAM_NAME` which defaults to `ride_predictions` which has just been created. Paste the following in the code editor:

```python
# lambda_function.py
import json
import base64
import boto3
import os

kinesis_client = boto3.client('kinesis')
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'ride_predictions')


def prepare_features(ride):
    features = {}
    features['PU_DO'] = f"{ride['PULocationID']}_{ride['DOLocationID']}"
    features['trip_distance'] = ride['trip_distance']
    return features
    
    
def predict(features):
    return 10.0


def lambda_handler(event, context):
    
    prediction_events = []

    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        decoded_data = base64.b64decode(encoded_data).decode('utf-8')
        
        ride_event = json.loads(decoded_data)
        ride = ride_event['ride']
        ride_id = ride_event['ride_id']
    
        features = prepare_features(ride)
        prediction = predict(features)
        
        prediction_event = {
            'model': 'ride_duration_prediction_model',
            'version': 123,
            'prediction': {
                'ride_duration': prediction,
                'ride_id': ride_id
            }
        }

        # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_record
        kinesis_client.put_record(
            StreamName=PREDICTIONS_STREAM_NAME,
            Data=json.dumps(prediction_event),
            PartitionKey=str(ride_id)
        )
        
        prediction_events.append(prediction_event)

    return {
        'predictions': prediction_events
    } 
```

Note that in applications multiple consumers can push data to this output stream, so we include `model` and `version` in the output for the prediction event to be traceable to this model. Also `put_records` which can support up to 500 records is cheaper, since you pay for each API call. But to make the code simpler, we use `put_record`. 

Testing this in the Lambda UI. If you fail to create the correct write permission you'll get the following error during testing:

```
"errorMessage": "An error occurred (AccessDeniedException) when calling the PutRecord operation: User: arn:aws:sts::241297376613:assumed-role/lambda-kinesis-role/ride-duration-prediction-test is not authorized to perform: kinesis:PutRecord on resource: arn:aws:kinesis:us-east-1:241297376613:stream/ride_predictions because no identity-based policy allows the kinesis:PutRecord action"...
```

```{figure} ../../../img/kinesis-out-stream-1.png
---
---
Test passed.
```

Reading from output stream:

```{margin}
**Get records**
<br>
v. `aws-cli/1.22.34`
```
```bash
KINESIS_STREAM_OUTPUT='ride_predictions'
SHARD='shardId-000000000000'

SHARD_ITERATOR=$(aws kinesis \
    get-shard-iterator \
        --shard-id ${SHARD} \
        --shard-iterator-type TRIM_HORIZON \
        --stream-name ${KINESIS_STREAM_OUTPUT} \
        --query 'ShardIterator' \
)

RESULT=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR)

echo ${RESULT} | jq -r '.Records[-1].Data' | base64 --decode | jq
```

```{margin}
`[out]`
```

```bash
{
  "model": "ride_duration_prediction_model",
  "version": 123,
  "prediction": {
    "ride_duration": 10,
    "ride_id": 123
  }
}
```

Here we only have one shard. If we have multiple shards, we have to know which shard to read to. Shard iterator gives us an ID of an iterator, and that gives us a way to retrieve records from a stream. This CLI command is a bit complex, since we are reading without Lambda. Using Lambda hides all of these details, so we only have to do this once.

### Serving the model as a container with ECR

Note that while we are able to write on the output stream, we are only able to write with fixed predictions of `10.0` minutes on a model with version `123`. Now we finally get to the most important part of adding our model. So, again, assume this model is a larger more accurate model that reads on the stream and updates the prediction.

#### Function definition

We further improve our `lambda_function.py` by including reading environment variables and  downloading the model from S3. The `predict` function now uses the model instead of a constant for returning predictions. Here we convert the output to float instead of a singleton numpy array since the output must be in JSON format. For testing, a `TEST_RUN` flag is defined so that the function can be called without writing on the `ride_predictions` stream.

```{margin}
[`lambda_function.py`](https://github.com/particle1331/inefficient-networks/blob/7d841f80865c9773a21b2c89c4ffb30fa1c94c2b/docs/notebooks/mlops/04-deployment/streaming/lambda_function.py)
```
```python
# lambda_function.py
import json
import base64
import boto3
import os
import mlflow

from ride_duration.predict import load_model
from ride_duration.utils import prepare_features


# Load environmental variables
PREDICTIONS_STREAM_NAME = os.getenv('PREDICTIONS_STREAM_NAME', 'ride_predictions')
TEST_RUN = os.getenv('TEST_RUN', 'False')
RUN_ID = os.getenv('RUN_ID')
EXPERIMENT_ID = os.getenv('EXPERIMENT_ID')

# Load the model from S3
model = load_model(experiment_id=EXPERIMENT_ID, run_id=RUN_ID)


def predict(features):
    prediction = model.predict(features)
    return float(prediction[0])


def lambda_handler(event, context):
    
    prediction_events = []

    for record in event['Records']:
        encoded_data = record['kinesis']['data']
        decoded_data = base64.b64decode(encoded_data).decode('utf-8')
        
        ride_event = json.loads(decoded_data)
        ride_data = ride_event['ride']
        ride_id = ride_event['ride_id']
    
        features = prepare_features([ride_data])
        prediction = predict(features)
        
        prediction_event = {
            'model': 'ride_duration_prediction_model',
            'version': RUN_ID,
            'prediction': {
                'ride_duration': prediction,
                'ride_id': ride_id
            }
        }

        if TEST_RUN == 'False':
            kinesis_client = boto3.client('kinesis')

            # This is just the Python client version of the `aws kinesis put-record` CLI command.
            # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_record
            kinesis_client.put_record(
                StreamName=PREDICTIONS_STREAM_NAME,
                Data=json.dumps(prediction_event),
                PartitionKey=str(ride_id)
            )
        
        prediction_events.append(prediction_event)

    return {
        'predictions': prediction_events
    }
```

#### Dockerfile

AWS Lambda provides a [base image](https://gallery.ecr.aws/lambda/python) which contain all the required components for running the container on that platform. Note that there is no need to set a working directory and we can work on the root directory.


```{margin}
[`Dockerfile`](https://github.com/particle1331/inefficient-networks/blob/7d841f80865c9773a21b2c89c4ffb30fa1c94c2b/docs/notebooks/mlops/04-deployment/streaming/Dockerfile)
```
```Dockerfile
FROM public.ecr.aws/lambda/python:3.9

RUN pip install -U pip
RUN pip install pipenv

COPY [ "Pipfile", "Pipfile.lock", "./" ]

RUN pipenv install --system --deploy

COPY [ "lambda_function.py", "./" ]

CMD [ "lambda_function.lambda_handler" ]
```

Building the image:

```bash
docker build -t stream-model-duration:v1 .
```

Running the image:
```bash
docker run -it --rm -p 8080:8080 --env-file .env stream-model-duration:v1
```

```{figure} ../../../img/ecr-img.png
---
width: 40em
---
AWS Lambda base image for Python.
```

#### Environmental variables

```bash
# .env
TEST_RUN=True
PREDICTIONS_STREAM_NAME=ride_predictions
EXPERIMENT_ID=1
RUN_ID=f4e2242a53a3410d89c061d1958ae70a
AWS_ACCESS_KEY_ID=*************
AWS_SECRET_ACCESS_KEY=*************
AWS_DEFAULT_REGION=us-east-1
```

#### Testing

AWS Lambda is a cloud service for calling functions. So to check correctness, we simply call `lambda_handler(event)` where the test event is assigned to `event` and examine the output. This is implemented in [`test.py`](https://github.com/particle1331/inefficient-networks/blob/e0312f57ce4693f6ffe58bd2dbab4ac95ead08f5/docs/notebooks/mlops/04-deployment/streaming/test.py) which is executed below. Note that the version is now the MLflow `run_id` and predicted ride duration is different from `10.0`. 

```bash
$ export $(cat .env | xargs)
$ python test.py

{
    'predictions': [
        {
            'model': 'ride_duration_prediction_model', 
            'version': 'f4e2242a53a3410d89c061d1958ae70a', 
            'prediction': {
                'ride_duration': 18.21077067418335, 
                'ride_id': 123
            }
        }
    ]
}"
```

Since the function definition looks to be correct, we proceed to testing whether the Docker container is able to run the function. This is implemented in [`test_docker.py`](https://github.com/particle1331/inefficient-networks/blob/e0312f57ce4693f6ffe58bd2dbab4ac95ead08f5/docs/notebooks/mlops/04-deployment/streaming/test_docker.py) that executes a POST request on the following URL:

```python
url = 'http://localhost:8080/2015-03-31/functions/function/invocations'
response = requests.post(url, json=event)
print(response.json())
```


The `functions/function/invocations` endpoint (v. `2015-03-31`) is exposed by the base image on `8080` which we did not modify. Recall that we had to do port-forwarding when running the container. In AWS Lambda, this endpoint is automatically exposed to records on the input stream, so no further configuration is required on our part. Running the test gives the same result as above.

#### Publishing to ECR

Now that the container is working, we push the container into the registry. First we create a repository for our containers.

```{margin}
`aws-cli` <br>
v. `1.22.34`
```
```bash
aws ecr create-repository --repository-name duration-model
```
```json
{
    "repository": {
        "repositoryArn": "arn:aws:ecr:us-east-1:241297376613:repository/duration-model",
        "registryId": "241297376613",
        "repositoryName": "duration-model",
        "repositoryUri": "241297376613.dkr.ecr.us-east-1.amazonaws.com/duration-model",
        "createdAt": 1656099951.0,
        "imageTagMutability": "MUTABLE",
        "imageScanningConfiguration": {
            "scanOnPush": false
        },
        "encryptionConfiguration": {
            "encryptionType": "AES256"
        }
    }
}
```

<br>

```{figure} ../../../img/ecr-repo.png
---
width: 40em
---
The `duration-model` repository in ECR.
```

Logging in:
```{margin}
`aws-cli` <br>
v. `1.22.34`
```
```bash
$(aws ecr get-login --no-include-email)
```

Pushing container to the repository:

```{margin}
`docker` <br>
v. `20.10.12` 
```
```bash
REMOTE_URI=241297376613.dkr.ecr.us-east-1.amazonaws.com/duration-model
REMOTE_TAG=v1
REMOTE_IMAGE_URI=${REMOTE_URI}:${REMOTE_TAG}
LOCAL_IMAGE=stream-model-duration:v1

docker tag ${LOCAL_IMAGE} ${REMOTE_IMAGE_URI}
docker push ${REMOTE_IMAGE_URI}
```

```bash
The push refers to repository [241297376613.dkr.ecr.us-east-1.amazonaws.com/duration-model]
d633cfbf6042: Pushed
1e16d3c3a5e4: Pushing  124.5MB/657MB
593e5b91fe04: Pushed
ed5e98d9c477: Pushed
a5a2488932a6: Pushed
8071867dc313: Pushed
39978c3cb375: Pushing  223.5MB
6ea38db36806: Pushed
f92fb29958b6: Pushed
f1c31f6b2603: Pushed
fe1bfb0e592a: Pushing  115.7MB/333.9MB
```

<br>

```{figure} ../../../img/ecr-repo-v1.png
---
width: 40em
---
The `v1` container has been pushed to the `duration-model` repository in ECR.
```

#### Creating the lambda function

Now that we have our container in ECR, we deploy this in AWS Lambda. Then, we attach the `ride_events` stream as input or trigger, and update its environmental variables. Recall that we run our container with a `.env` file during testing. We also have to update the memory allocated to the function and its allocated prediction time.


```{figure} ../../../img/ecr-create-function.png
---
width: 40em
---
Creating a Lambda function based on a container. Here we need the remote image URI which we copied using the **Copy URI** button in the `v1` image in ECR.
```

```{figure} ../../../img/ecr-memory-settings.png
---
width: 40em
---
Update memory to 512 MB and timeout to 15 seconds.
```

```{figure} ../../../img/ecr-env-variables.png
---
width: 40em
---
Assigning environmental variables. Here we just transfer our `.env` files.
```

Configure the test in the UI with the above test event[^ref] and execute test. Here we are testing for permission and execution time and memory. Notice that the first call takes a while, but subsequent tests are faster. This is why we initially set timeout to 15 seconds. Further tests only require 0.1 seconds.

[^ref]: Press CTRL+F **Actual test event** to find the code cell where this is defined.

```{figure} ../../../img/ecr-test-results.png
---
width: 40em
---
Execution results for testing in the UI.
```

<br>

Further tests by modifying `ride_id` for better visibility. Here we will try to retrieve the results from the output stream. Putting an event on input stream:

```bash
aws kinesis put-record \
    --stream-name ride_events \
    --partition-key 1 \
    --data '{
        "ride": {
            "PULocationID": 130,
            "DOLocationID": 205,
            "trip_distance": 3.66
        },
        "ride_id": "container_test_event"
    }'
```

Fetching test results from output stream:

```bash
KINESIS_STREAM_OUTPUT='ride_predictions'
SHARD='shardId-000000000000'

SHARD_ITERATOR=$(aws kinesis \
    get-shard-iterator \
        --shard-id ${SHARD} \
        --shard-iterator-type TRIM_HORIZON \
        --stream-name ${KINESIS_STREAM_OUTPUT} \
        --query 'ShardIterator' \
)

RESULT=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR)
echo ${RESULT} | jq -r '.Records[-1].Data' | base64 --decode | jq
```

Notice that the result you might get is from our `test` function which also reads and writes on the same data streams. After waiting for a few seconds, execute this again and check the output. It looks like the output is from our container. Nice!

```bash
...
$ echo ${RESULT} | jq -r '.Records[-1].Data' | base64 --decode | jq
{
  "model": "ride_duration_prediction_model",
  "version": "f4e2242a53a3410d89c061d1958ae70a",
  "prediction": {
    "ride_duration": 18.210770674183355,
    "ride_id": "container_test_event"
  }
}
```

## Deploying batch predictions

For use cases that do not require the responsiveness of a web service, we can implement an offline service that makes batch predictions. Typically, offline services are expected to be done between fixed time periods, e.g. daily, weekly, or monthly. A critical element of this is **workflow orchestration** where we regularly pull from a database, make predictions on that data, then write the predictions on a database, or to a file that is uploaded to S3, or it can be pushed to an analytics dashboard thereby refreshing it. 

### Batch scoring

To do batch scoring, we simply load the model and use it to make prediction on a list of examples instead of a single example as in the web service. This is quite easily done using the `ride_duration` package. Note that the input file has no natural `id` column, so we randomly generate a [`uuid`](https://datatracker.ietf.org/doc/html/rfc4122.html) for each example which is a string that is expected to be unique across space and time. Our general workflow would look like:

In [20]:
from ride_duration.utils import load_training_dataframe
from ride_duration.predict import load_model, make_prediction
from uuid import uuid4

import pandas as pd
import os

from dotenv import load_dotenv 
load_dotenv() # Load variables in .env

input_file = f'https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.parquet'
df = load_training_dataframe(input_file)

model = load_model(os.getenv("EXPERIMENT_ID"), os.getenv("RUN_ID"))
preds = make_prediction(model, df)
uuids = [str(uuid4()) for _ in range(len(df))]

Our output file includes all information relevant to modelling (features, targets, and predictions) that can be useful for analytics and monitoring. For observability, we include the generated `uuid` for each row and also include the `run_id` of the model used to make predictions.

In [23]:
out = df[['lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance', 'duration']].copy()

out['model_version']= os.getenv("RUN_ID")
out['actual_duration'] = df.duration
out['predicted_duration'] = preds
out['diff'] = out.actual_duration - out.predicted_duration
out['ride_id'] = uuids

out = out[['ride_id', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance', 'actual_duration', 'predicted_duration', 'diff', 'model_version']]
out.head(5)

Unnamed: 0,ride_id,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance,actual_duration,predicted_duration,diff,model_version
0,84f54b9f-ac47-419c-a16e-765a6513dd17,2021-01-01 00:15:56,43,151,1.01,3.933333,6.674317,-2.740983,f4e2242a53a3410d89c061d1958ae70a
1,f2a9413f-4130-434e-8a00-c3a45bdcf56a,2021-01-01 00:25:59,166,239,2.53,8.75,13.79195,-5.04195,f4e2242a53a3410d89c061d1958ae70a
2,c572886d-afda-45ac-844c-37da663a1819,2021-01-01 00:45:57,41,42,1.12,5.966667,6.965782,-0.999115,f4e2242a53a3410d89c061d1958ae70a
3,0133a7d3-3344-45e3-a7c5-a1e667da8cce,2020-12-31 23:57:51,168,75,1.99,7.083333,11.486757,-4.403424,f4e2242a53a3410d89c061d1958ae70a
7,37f16f1b-405f-4eab-a772-5eef2d32434c,2021-01-01 00:26:31,75,75,0.45,2.316667,3.498446,-1.18178,f4e2242a53a3410d89c061d1958ae70a


### Deploying a batch scoring workflow in Prefect

```text
=== TODO (waiting for new video with orchestration :) ===
```

<!-- ### Scoring script

```{margin}
[`score.py`](https://github.com/particle1331/inefficient-networks/blob/217134c84bb323452bf0dc3e8b6a6a04fea8f06b/docs/notebooks/mlops/04-deployment/score.py)
```
```python
from ride_duration.utils import load_training_dataframe
from ride_duration.predict import load_model, make_prediction


def generate_uuids(n):
    ride_ids = []
    for i in range(n):
        ride_ids.append(str(uuid.uuid4()))
    return ride_ids


def apply_model(
    input_file: str, 
    run_id: str, 
    output_file: str
) -> None:
    
    print(f'Reading the data from {input_file}...')
    df = load_training_dataframe(input_file)
    df['ride_id'] = generate_uuids(len(df))

    print(f'Loading the model with RUN_ID={run_id}...')
    model = load_model()

    print(f'Applying the model...')
    preds = make_prediction(model, df)

    print(f'Saving the result to {output_file}...')
    df_result = pd.DataFrame()
    df_result['ride_id'] = df['ride_id']
    df_result['lpep_pickup_datetime'] = df['lpep_pickup_datetime']
    df_result['PULocationID'] = df['PULocationID']
    df_result['DOLocationID'] = df['DOLocationID']
    df_result['actual_duration'] = df['duration']
    df_result['predicted_duration'] = preds
    df_result['diff'] = df_result['actual_duration'] - df_result['predicted_duration']
    df_result['model_version'] = run_id
    df_result.to_parquet(output_file, index=False)


def run(taxi_type: str, year: int, month: int, run_id: str) -> None:

    source_url = 'https://s3.amazonaws.com/nyc-tlc/trip+data'
    input_file = f'{source_url}/{taxi_type}_tripdata_{year:04d}-{month:02d}.parquet'
    output_file = f'output/{taxi_type}/{year:04d}-{month:02d}.parquet'

    apply_model(
        input_file=input_file,
        run_id=run_id,
        output_file=output_file
    )


if __name__ == '__main__':

    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--taxi_type", default='green', type=str)
    parser.add_argument("--year", default=2021, type=int)
    parser.add_argument("--month", default=1, type=int)
    parser.add_argument("--run_id", type=str)
    parser.add_argument("--experiment_id", type=int)
    args = parser.parse_args()
    
    run(
        taxi_type=args.taxi_type,
        year=args.year,
        month=args.month,
        run_id=args.run_id
    )
``` -->

<!-- ```bash
pipenv install --dev python-dotenv
python score.py
``` -->

## Appendix: Model train script

For training models that we use to serve predictions in our API, we use the following script. This trains a model using the `ride_duration` package (which ensures smooth integration with the Flask API) and logs the trained model to a remote MLflow tracking server. The tracking server host is provided as a command line argument.

```{margin}
[`train.py`](https://github.com/particle1331/inefficient-networks/blob/217134c84bb323452bf0dc3e8b6a6a04fea8f06b/docs/notebooks/mlops/04-deployment/train.py)
```
```python
import mlflow 
import joblib

from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.feature_extraction import DictVectorizer
from sklearn.pipeline import make_pipeline

from ride_duration.utils import load_training_dataframe, prepare_features


def setup(tracking_server_host):
    TRACKING_URI = f"http://{tracking_server_host}:5000"
    mlflow.set_tracking_uri(TRACKING_URI)
    mlflow.set_experiment("nyc-taxi-experiment")


def run_training(X_train, y_train, X_valid, y_valid):
    with mlflow.start_run():
        params = {
            'n_estimators': 100,
            'max_depth': 20
        }
        
        pipeline = make_pipeline(
            DictVectorizer(), 
            RandomForestRegressor(**params, n_jobs=-1)
        )
        
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_valid)
        rmse = mean_squared_error(y_valid, y_pred, squared=False)
        
        mlflow.log_params(params)
        mlflow.log_metric("rmse_valid", rmse)
        mlflow.sklearn.log_model(pipeline, artifact_path='model')


if __name__ == "__main__":

    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--tracking-server-host", type=str)
    parser.add_argument("--train_path", type=str)
    parser.add_argument("--valid_path", type=str)
    args = parser.parse_args()

    # Getting data from disk
    train_data = load_training_dataframe(args.train_path)
    valid_data = load_training_dataframe(args.valid_path)

    # Preprocessing dataset
    X_train = prepare_features(train_data.drop(['duration'], axis=1))
    X_valid = prepare_features(valid_data.drop(['duration'], axis=1))
    y_train = train_data.duration.values
    y_valid = valid_data.duration.values

    # Push training to server
    setup(args.tracking_server_host)
    run_training(X_train, y_train, X_valid, y_valid)
```