# Building your own Docker image for BYOA

We will start our MLOps journey here by creating an abstract Docker Image for supporting Scikit-learn algorithms/models. As explained in https://docs.aws.amazon.com/sagemaker/latest/dg/amazon-sagemaker-containers.html SageMaker defines a common interface for its docker containers.

## Let's create the Dockerfile

In [None]:
%%writefile Dockerfile
FROM python:3.6-jessie

RUN apt-get update -y && apt-get install -y libev-dev
RUN pip install bottle bjoern opencv-python pandas==0.25.1 numpy scipy scikit-learn

RUN mkdir -p /opt/program
RUN mkdir -p /opt/ml

ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

COPY app.py /opt/program
COPY model.py /opt/program
WORKDIR /opt/program

EXPOSE 8080
ENTRYPOINT ["python", "app.py"]

Note the two default directories /opt/program and /opt/ml used by SageMaker containers

Also, if running without arguments, the continer will start with `python app.py` on the /opt/program directory

## Let's create our application code

Please, notice that we're creating a WebService application with two methods: ping and invocations. Ping is for healthcheck and invocations is for calling your model. For a production environment it is important to use a WSGI solution. We will use a combo of bottle and bjoern. Bottle is our webservice api and bjoern our WSGI server.

In [None]:
%%writefile app.py
import json
import pickle
import sys
import os
import bjoern
import bottle

from bottle import run, request, post, get
import joblib

# adds the model.py path to the list
prefix = '/opt/ml'
model_path = os.path.join(prefix, 'model')
sys.path.insert(0,model_path)

print(os.listdir(model_path))
import model

@get('/ping')
def ping():
    return ""

@post('/invocations')
def invoke():
    # load image from POST and convert it to json
    req = json.loads(request.body.read())
    algo = "logistic" # request.get_header('X-Amzn-SageMaker-Custom-Attributes')
    return json.dumps(model.predict(req, algo))
    
if __name__ == '__main__':
    if len(sys.argv) < 2 or ( not sys.argv[1] in [ "serve", "train", "test"] ):
        raise Exception("Invalid argument: you must inform 'train' for training mode or 'serve' predicting mode") 

    train = sys.argv[1] == "train"
    test = sys.argv[1] == "test"
    
    if train:
        model.train()

    elif test:
        algo = sys.argv[2]
        req = eval(sys.argv[3])
        print( model.predict(req, algo) )
       
    else:
        bjoern.run(bottle.app(), "0.0.0.0", 8080)

## Let's create the model training and inference code using scikit-learn

We now create the `model.py` module used by `app.py` above

In [None]:
%%writefile model.py
import numpy as np
import json
import os
import pandas as pd
import re

from sklearn import model_selection
import joblib

from sklearn.linear_model import LogisticRegression

# This directory is the communication channel between Sagemaker and your container
prefix = '/opt/ml'

# Here, Sagemaker will store the dataset copyied from S3
input_path = os.path.join(prefix, 'input/data')
# If something bad happens, write a failure file with the error messages and store here
output_path = os.path.join(prefix, 'output')
# Everything you store here will be packed into a .tar.gz by Sagemaker and store into S3
model_path = os.path.join(prefix, 'model')
# This is the hyperparameters you will send to your algorithms through the Estimator
param_path = os.path.join(prefix, 'input/config/hyperparameters.json')

model_cache = {}

def train():
    print("Training mode")
    
    try:
        # This algorithm has a single channel of input data called 'training'. Since we run in
        # File mode, the input files are copied to the directory specified here.
        channel_name='training'
        training_path = os.path.join(input_path, channel_name)

        hyper_logistic = {}
        hyper_random_forest = {}
        # Read in any hyperparameters that the user passed with the training job
        with open(param_path, 'r') as tc:
            is_float = re.compile(r'^\d+(?:\.\d+)$')
            is_integer = re.compile(r'^\d+$')
            for key,value in json.load(tc).items():
                # workaround to convert numbers from string
                if is_float.match(value) is not None:
                    value = float(value)
                elif is_integer.match(value) is not None:
                    value = int(value)
                
                if key.startswith('logistic'):
                    key = key.replace('logistic_', '')
                    hyper_logistic[key] = value

        # Take the set of files and read them all into a single pandas dataframe
        input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
        if len(input_files) == 0:
            raise ValueError(('There are no files in {}.\\n' +
                              'This usually indicates that the channel ({}) was incorrectly specified,\\n' +
                              'the data specification in S3 was incorrectly specified or the role specified\\n' +
                              'does not have permission to access the data.').format(training_path, channel_name))
        raw_data = [ pd.read_csv(file, sep=',', header=None ) for file in input_files ]
        train_data = pd.concat(raw_data)
        
        # labels are in the first column
        Y = train_data.iloc[:,0]
        X = train_data.iloc[:,1:]
        
        X_train, X_test, Y_train, Y_test = model_selection.train_test_split(X, Y, test_size=0.33, random_state=7)

        algo = "logistic"
        print("Training: %s" % algo)
        model = LogisticRegression()
        model.set_params(**hyper_logistic)
        model.fit(X_train, Y_train)
        print("{}: {}".format( algo, model.score(X_test, Y_test)) )
        joblib.dump(model, open(os.path.join(model_path, '%s_model.pkl' % algo), 'wb'))
    
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # DescribeTrainingJob result.
        trc = traceback.format_exc()
        with open(os.path.join(output_path, 'failure'), 'w') as s:
            s.write('Exception during training: ' + str(e) + '\\n' + trc)
            
        # Printing this causes the exception to be in the training job logs, as well.
        print('Exception during training: ' + str(e) + '\\n' + trc, file=sys.stderr)
        
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

def predict(payload, algo):
    if algo is None or payload is None:
        raise ValueError( "You need to inform the algorithm and the payload" )
    
    if model_cache.get(algo) is None:
        model_filename = os.path.join(model_path, '%s_model.pkl' % algo)
        model_cache[algo] = joblib.load(open(model_filename, 'rb'))
    
    return {"iris_id": model_cache[algo].predict( payload ).tolist() }


Let's test building the image locally

In [None]:
!sudo docker build -f Dockerfile -t iris-model:1.0 .

## Let's do some tests with our model image, locally

First let's define some hyperparameters and store them as a json file. Since our container is using logistic regression:

In [None]:
hyperparameters = {
    "logistic_max_iter": 100,
    "logistic_solver": "lbfgs"
}

In [None]:
#Now we store the hyperparameters as a JSON file in an input folder that will be mapped to the folder SageMaker
# expects to find them on
import json
!mkdir -p input/config

hyperparameters = dict({key: str(values) for key, values in hyperparameters.items()})
with open('input/config/hyperparameters.json', 'w') as f:
    f.write(json.dumps(hyperparameters))
    f.flush()
    f.close()

## Now, let's copy the Iris training data file to the training channel

In [None]:
!mkdir -p input/data/training
!cp iris_train.csv input/data/training/

## And we test the training process

We will map the local notebook folders to the folders that SageMaker uses to sync data and models from S3 to the container and back, to simulate the run as it would be happening on SageMaker

In [None]:
!mkdir -p model

In [None]:
print( "Training ...")
!docker run --rm --name 'my_model' \
    -v "$PWD/model:/opt/ml/model" \
    -v "$PWD/input:/opt/ml/input" iris-model:1.0 train

## Check your model folder

You will see that the trained model has been created.

## Let's test inference with the container

In [None]:
print( "Testing with logistic")
!docker run --rm --name 'my_model' \
    -v "$PWD/model:/opt/ml/model" \
    -v "$PWD/input:/opt/ml/input" iris-model:1.0 test logistic "[[6.7, 3.1, 5.6, 2.4]]"
print("Result for iris type above should be category 2")

## Now let's simulate an Endpoint exposed by Sagemaker

After you execute the next cell, this Jupyter notebook will freeze. A webservice will be exposed at the port 8080.

In [None]:
!docker run --rm --name 'my_model' \
    -p 8080:8080 \
    -v "$PWD/model:/opt/ml/model" \
    -v "$PWD/input:/opt/ml/input" iris-model:1.0 serve

While the above cell is running, you can open a terminal or a separate notebook to make POST requests for inference to port 8080, or to test the healthcheck /ping path.

e.g. code in Python (note that no authorization is used at this point):
```
import json, requests

payload = json.dumps([[4.6, 3.1, 1.5, 0.2]]).encode('utf-8')
headers={"Content-type": "application/json", "X-Amzn-SageMaker-Custom-Attributes": "logistic"}

resp = requests.post('http://localhost:8080/invocations', data = payload, headers = headers)
print("Response code: %d, Payload: [%s]" % (resp.status_code, resp.text))
```

## We can now BYOA to SageMaker

We need to publish the container to ECR first, but it should be ready to work as a new custom SageMaker training and inference algorithm. Let's first stop the previous cell from running!

In [None]:
import os
import boto3

region = boto3.Session().region_name

os.environ['IMAGE_REPO_NAME']="iris-model"
os.environ['IMAGE_TAG']="1.0"
os.environ['AWS_DEFAULT_REGION']=region
os.environ['AWS_ACCOUNT_ID']=boto3.client('sts').get_caller_identity()["Account"]

# Get login for ECR in current region
!aws ecr create-repository --repository-name $IMAGE_REPO_NAME
!$(aws ecr get-login --no-include-email --region $AWS_DEFAULT_REGION)
!docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG
!docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG

### Now that we have our training and inference container in ECR, we can use it in sagemaker training and predictions

In [None]:
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()
role = get_execution_role()
training_image = '{}.dkr.ecr.{}.amazonaws.com/{}:1.0'.format(os.environ['AWS_ACCOUNT_ID'], region, os.environ['IMAGE_REPO_NAME'])
bucket = sagemaker_session.default_bucket()

estimator = sagemaker.estimator.Estimator(
    training_image,
    role,
    train_instance_count=1,
    train_instance_type='ml.m5.large',
    output_path='s3://{}/iris/output_byoa'.format(bucket),
    sagemaker_session=sagemaker_session
)



# set the hyperparameters
estimator.set_hyperparameters(
                        logistic_max_iter=100,
                        logistic_solver='lbfgs'
)

In [None]:
%%time

# Now run training against the training and test sets created above
# Refer to the SageMaker training console
estimator.fit({
    'training': sagemaker.session.s3_input(s3_data='s3://{}/iris/data/iris_train.csv'.format(bucket), content_type="csv"),
    'validation': sagemaker.session.s3_input(s3_data='s3://{}/iris/data/iris_test.csv'.format(bucket), content_type="csv")
})

In [None]:
%%time
predictor = estimator.deploy(initial_instance_count=1, instance_type='ml.t2.medium')

In [None]:
# With boto3
sm = boto3.client('sagemaker-runtime')
from sagemaker.predictor import csv_serializer

exampledata = '[[6.7,3.1,5.6,2.4]]'

resp = sm.invoke_endpoint(
    EndpointName=predictor.endpoint,
    ContentType='text/csv',
    Body=csv_serializer(exampledata),
    CustomAttributes="logistic"
)

prediction = json.loads(resp['Body'].read().decode('utf-8'))
print("Classified as {} - Should be: 2".format(prediction["iris_id"]))

**Note**: the running SageMaker container logs can be accessed in Cloudwatch logs for debugging! All print statements errors and stack traces will be visible there.

In [None]:
# Cleanup
predictor.delete_endpoint()