# Customized Training File

In [None]:
%%writefile train.py
import os
import sys
import pandas as pd
import re
import joblib
import json
from sklearn.ensemble import RandomForestClassifier

def load_dataset(path):
    # Take the set of files and read them all into a single pandas dataframe
    files = [ os.path.join(path, file) for file in os.listdir(path) ]
    
    if len(files) == 0:
        raise ValueError("Invalid # of files in dir: {}".format(path))

    raw_data = [ pd.read_csv(file, sep=",", header=None ) for file in files ]
    data = pd.concat(raw_data)

    # labels are in the first column
    y = data.iloc[:,0]
    X = data.iloc[:,1:]
    return X,y

def preprocess_data(df):
    cols = df.columns
    for c in cols:
        mean = sum(df[c])/len(df)
        df[c] = (df[c] - mean)/df[c].std()
    return df 
    
    
def start(args):
    print("Training mode")

    try:
        X_train, y_train = load_dataset(args.train)
        X_test, y_test = load_dataset(args.validation)
        
        hyperparameters = {
            "max_depth": args.max_depth,
            "verbose": 1, # show all logs
            "n_jobs": args.n_jobs,
            "n_estimators": args.n_estimators
        }
        
        X_train = preprocess_data(X_train)
        X_test = preprocess_data(X_test)
        
        print("Training the classifier")
        model = RandomForestClassifier()
        model.set_params(**hyperparameters)
        model.fit(X_train, y_train)
        print("Score: {}".format( model.score(X_test, y_test)) )
        joblib.dump(model, open(os.path.join(args.model_dir, "iris_model.pkl"), "wb"))
    
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # DescribeTrainingJob result.
        trc = traceback.format_exc()
        with open(os.path.join(args.output_dir, "failure"), "w") as s:
            s.write("Exception during training: " + str(e) + "\\n" + trc)
            
        # Printing this causes the exception to be in the training job logs, as well.
        print("Exception during training: " + str(e) + "\\n" + trc, file=sys.stderr)
        
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

# Customized Inference Handler

In [None]:
%%writefile handler.py
import os
import sys
import joblib
import pandas as pd
from sagemaker_inference.default_inference_handler import DefaultInferenceHandler
from sagemaker_inference.default_handler_service import DefaultHandlerService
from sagemaker_inference import content_types, errors, transformer, encoder, decoder


## Preprocess input data
def preprocess_data(df):
    print(type(df), df)
    df = pd.DataFrame(eval(df))
    cols = df.columns
    for c in cols:
        mean = sum(df[c])/len(df)
        df[c] = (df[c] - mean)/df[c].std()    
    result = df.to_string(index = False) # Check if this index false thing works
    print(result)
    result = result.splitlines()
    new = []
    for ele in result:
        element = ele.split(' ')[-1]
        new.append(element)
    new = new[1:]
    
    #X = [new, [0,0,0,0]]
    #y = [0,0]
    #print(X)
    #for a,b in zip(X,y):
    #    continue
    
    final = ''
    for i in range(len(new)):
        final = final + str(new[i]) + ','
    final = final[:-1]
    print(type(final), final)
    return final


class HandlerService(DefaultHandlerService, DefaultInferenceHandler):
    def __init__(self):
        op = transformer.Transformer(default_inference_handler=self)
        super(HandlerService, self).__init__(transformer=op)
    
    ## Loads the model from the disk
    def default_model_fn(self, model_dir):
        model_filename = os.path.join(model_dir, "iris_model.pkl")
        return joblib.load(open(model_filename, "rb")) 
    
    ## Parse and check the format of the input data
    def default_input_fn(self, input_data, content_type):
        if content_type != "text/csv":
            raise Exception("Invalid content-type: %s" % content_type)
        input_data = preprocess_data(input_data)
        return decoder.decode(input_data, content_type).reshape(1,-1)
    
    ## Run our model and do the prediction
    def default_predict_fn(self, payload, model):
        return model.predict( payload ).tolist()
    
    ## Gets the prediction output and format it to be returned to the user
    def default_output_fn(self, prediction, accept):
        if accept != "text/csv":
            raise Exception("Invalid accept: %s" % accept)
        return encoder.encode(prediction, accept)

# Creating Container Entrypoint

In [None]:
%%writefile main.py
import train
import argparse
import sys
import os
import traceback
from sagemaker_inference import model_server
from sagemaker_training import environment

if __name__ == "__main__":
    if len(sys.argv) < 2 or ( not sys.argv[1] in [ "serve", "train" ] ):
        raise Exception("Invalid argument: you must inform 'train' for training mode or 'serve' predicting mode") 
        
    if sys.argv[1] == "train":
        
        env = environment.Environment()
        
        parser = argparse.ArgumentParser()
        # https://github.com/aws/sagemaker-training-toolkit/blob/master/ENVIRONMENT_VARIABLES.md
        parser.add_argument("--max-depth", type=int, default=10)
        parser.add_argument("--n-jobs", type=int, default=env.num_cpus)
        parser.add_argument("--n-estimators", type=int, default=120)
        
        # reads input channels training and testing from the environment variables
        parser.add_argument("--train", type=str, default=env.channel_input_dirs["train"])
        parser.add_argument("--validation", type=str, default=env.channel_input_dirs["validation"])

        parser.add_argument("--model-dir", type=str, default=env.model_dir)
        parser.add_argument("--output-dir", type=str, default=env.output_dir)
        
        args,unknown = parser.parse_known_args()
        train.start(args)
    else:
        model_server.start_model_server(handler_service="serving.handler")

# Creating Docker File

In [None]:
%%writefile Dockerfile
FROM python:3.7-buster

# Set a docker label to advertise multi-model support on the container
LABEL com.amazonaws.sagemaker.capabilities.multi-models=false
# Set a docker label to enable container to use SAGEMAKER_BIND_TO_PORT environment variable if present
LABEL com.amazonaws.sagemaker.capabilities.accept-bind-to-port=true

RUN apt-get update -y && apt-get -y install --no-install-recommends default-jdk
RUN rm -rf /var/lib/apt/lists/*

RUN pip --no-cache-dir install multi-model-server sagemaker-inference sagemaker-training
RUN pip --no-cache-dir install pandas numpy scipy scikit-learn

ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PYTHONPATH="/opt/ml/code:${PATH}"

COPY main.py /opt/ml/code/main.py
COPY train.py /opt/ml/code/train.py
COPY handler.py /opt/ml/code/serving/handler.py

ENTRYPOINT ["python", "/opt/ml/code/main.py"]

# Creating Buildspec

In [None]:
%%writefile buildspec.yml
version: 0.2

phases:
  install:
    runtime-versions:
      docker: 18

  pre_build:
    commands:
      - echo Logging in to Amazon ECR...
      - $(aws ecr get-login --no-include-email --region $AWS_DEFAULT_REGION)
  build:
    commands:
      - echo Build started on `date`
      - echo Building the Docker image...
      - docker build -t $IMAGE_REPO_NAME:$IMAGE_TAG .
      - docker tag $IMAGE_REPO_NAME:$IMAGE_TAG $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG

  post_build:
    commands:
      - echo Build completed on `date`
      - echo Pushing the Docker image...
      - echo docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG
      - docker push $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG
      - echo $AWS_ACCOUNT_ID.dkr.ecr.$AWS_DEFAULT_REGION.amazonaws.com/$IMAGE_REPO_NAME:$IMAGE_TAG > image.url
      - echo Done
artifacts:
  files:
    - image.url
  name: image_url
  discard-paths: yes

# Building the image locally and doing some tests

In [None]:
!docker build -f Dockerfile -t iris_model:1.0 .

In [None]:
!rm -rf input
!mkdir -p input/data/train
!mkdir -p input/data/validation

import pandas as pd
import numpy as np

from sklearn import datasets
from sklearn.model_selection import train_test_split

iris = datasets.load_iris()

dataset = np.insert(iris.data, 0, iris.target,axis=1)

df = pd.DataFrame(data=dataset, columns=["iris_id"] + iris.feature_names)
X = df.iloc[:,1:]
y = df.iloc[:,0]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

train_df = X_train.copy()
train_df.insert(0, "iris_id", y_train)
train_df.to_csv("input/data/train/training.csv", sep=",", header=None, index=None)

test_df = X_test.copy()
test_df.insert(0, "iris_id", y_test)
test_df.to_csv("input/data/validation/testing.csv", sep=",", header=None, index=None)

df.head()

In [None]:
# Local testing using in-built Docker daemon

In [None]:
!rm -rf input/config && mkdir -p input/config

In [None]:
%%writefile input/config/hyperparameters.json
{"max_depth": 20, "n_jobs": 4, "n_estimators": 120}

In [None]:
%%writefile input/config/resourceconfig.json
{"current_host": "localhost", "hosts": ["algo-1-kipw9"]}

In [None]:
%%writefile input/config/inputdataconfig.json
{"train": {"TrainingInputMode": "File"}, "validation": {"TrainingInputMode": "File"}}

In [None]:
%%time
!rm -rf model/
!mkdir -p model

print( "Training...")
!docker run --rm --name "my_model" \
    -v "$PWD/model:/opt/ml/model" \
    -v "$PWD/output:/opt/ml/output" \
    -v "$PWD/input:/opt/ml/input" iris_model:1.0 train

In [None]:
# Serving the model. Perform tests after running the next cell. Stop kernel on completion.

In [None]:
!docker run --rm --name "my_model" \
    -p 8080:8080 \
    -v "$PWD/model:/opt/ml/model" \
    -v "$PWD/input:/opt/ml/input" iris_model:1.0 serve

In [None]:
# Tests. Ignore these tests for now.

In [None]:
import json
from urllib import request

base_url='http://localhost:8080'

In [None]:
resp = request.urlopen("%s/ping" % base_url)
print("Response code: %d" % resp.getcode() )

In [None]:
%%time
from sagemaker.serializers import CSVSerializer
csv_serializer = CSVSerializer()
payloads = [
    [4.6, 3.1, 1.5, 0.2], # 0
    [7.7, 2.6, 6.9, 2.3], # 2
    [6.1, 2.8, 4.7, 1.2]  # 1
]

def predict(payload):
    headers = {
        'Content-type': 'text/csv',
        'Accept': 'text/csv'
    }
    
    req = request.Request("%s/invocations" % base_url, data=csv_serializer.serialize(payload).encode('utf-8'), headers=headers)
    resp = request.urlopen(req)
    print("Response code: %d, Prediction: %s\n" % (resp.getcode(), resp.read()))
    for i in resp.headers:
        print(i, resp.headers[i])

for p in payloads:
    predict(p)

# Integration Testing

In [None]:
import boto3

sts_client = boto3.client("sts")
session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name
credentials = session.get_credentials()
credentials = credentials.get_frozen_credentials()

repo_name="iris-model"
image_tag="test"

In [None]:
!sudo rm -rf tests && mkdir -p tests
!cp handler.py main.py train.py Dockerfile buildspec.yml tests/
with open("tests/vars.env", "w") as f:
    f.write("AWS_ACCOUNT_ID=%s\n" % account_id)
    f.write("IMAGE_TAG=%s\n" % image_tag)
    f.write("IMAGE_REPO_NAME=%s\n" % repo_name)
    f.write("AWS_DEFAULT_REGION=%s\n" % region)
    f.write("AWS_ACCESS_KEY_ID=%s\n" % credentials.access_key)
    f.write("AWS_SECRET_ACCESS_KEY=%s\n" % credentials.secret_key)
    f.write("AWS_SESSION_TOKEN=%s\n" % credentials.token )
    f.close()

!cat tests/vars.env

In [None]:
%%time

!/tmp/aws-codebuild/local_builds/codebuild_build.sh \
    -a "$PWD/tests/output" \
    -s "$PWD/tests" \
    -i "samirsouza/aws-codebuild-standard:3.0" \
    -e "$PWD/tests/vars.env" \
    -c

In [None]:
# Running the tests using SageMaker Estimator

In [None]:
import sagemaker
import json
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
prefix='mlops/iris'

In [None]:
train_path = sagemaker_session.upload_data(path='input/data/train', key_prefix='iris-model/input/train')
test_path = sagemaker_session.upload_data(path='input/data/validation', key_prefix='iris-model/input/validation')
print("Train: %s\nValidation: %s" % (train_path, test_path) )

In [None]:
# Create the estimator
# iris-model:test is the name of the container created in the previous notebook
# By the local codebuild test. An image with that name:tag was pushed to the ECR.
iris = sagemaker.estimator.Estimator('iris-model:test',
                                    role,
                                    instance_count=1, 
                                    instance_type='local',
                                    output_path='s3://{}/{}/output'.format(bucket, prefix))
hyperparameters = {
    'max_depth': 20,
    'n_jobs': 4,
    'n_estimators': 120
}

print(hyperparameters)
iris.set_hyperparameters(**hyperparameters)

In [None]:
iris.fit({'train': train_path, 'validation': test_path })

In [None]:
iris_predictor = iris.deploy(initial_instance_count=1, instance_type='local')

In [None]:
import pandas as pd
import random
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

# configure the predictor to do everything for us
iris_predictor.serializer = CSVSerializer()
iris_predictor.deserializer = CSVDeserializer()

# load the testing data from the validation csv
validation = pd.read_csv('input/data/validation/testing.csv', header=None)
idx = random.randint(0,len(validation)-5)
req = validation.iloc[idx:idx+5].values

# cut a sample with 5 lines from our dataset and then split the label from the features.
X = req[:,1:].tolist()
y = req[:,0].tolist()

# call the local endpoint
for features,label in zip(X,y):
    prediction = iris_predictor.predict(features)

    # compare the results
    print("RESULT: {} == {} ? {}".format( label, prediction, label == prediction ) )

In [None]:
iris_predictor.delete_endpoint()

# Pushing assets to CodeCommit Repo

This next step will push assets to a codecommit repo. There is a codepipeline listening on this repo and will start a 
a new build process using codebuild. When the pipeline finishes, check for an image with the name iris-model:latest in ECR.

In [None]:
%%bash
cd ../../../mlops
git checkout iris_model
cp $OLDPWD/buildspec.yml $OLDPWD/handler.py $OLDPWD/train.py $OLDPWD/main.py $OLDPWD/Dockerfile .

git add --all
git commit -a -m " - files for building an iris model image"
git push

# Starting an automated ML pipeline in MLOps environment 

This next pipeline listens to an S3 bucket. We will be adding a .zip file to this bucket containing information about the training job, and how to prepare and deploy the endpoints which should initiate the pipeline. 

In [None]:
import sagemaker
import boto3

use_xgboost_builtin=False

sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
region = boto3.session.Session().region_name
model_prefix='iris-model'
training_image = None
hyperparameters = None
if use_xgboost_builtin: 
    training_image = sagemaker.image_uris.retrieve('xgboost', boto3.Session().region_name, version='1.0-1')
    hyperparameters = {
        "alpha": 0.42495142279951414,
        "eta": 0.4307531922567607,
        "gamma": 1.8028358018081714,
        "max_depth": 10,
        "min_child_weight": 5.925133573560345,
        "num_class": 3,
        "num_round": 30,
        "objective": "multi:softmax",
        "reg_lambda": 10,
        "silent": 0,
    }
else:
    training_image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account_id, region, model_prefix)
    hyperparameters = {
        "max_depth": 11,
        "n_jobs": 5,
        "n_estimators": 120
    }
print(training_image)

In [None]:
# Creating a training job descriptor

In [None]:
import time
import sagemaker
import boto3

roleArn = "arn:aws:iam::{}:role/MLOps".format(account_id)
timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
job_name = model_prefix + timestamp
sagemaker_session = sagemaker.Session()

training_params = {}

# Here we set the reference for the Image Classification Docker image, stored on ECR (https://aws.amazon.com/pt/ecr/)
training_params["AlgorithmSpecification"] = {
    "TrainingImage": training_image,
    "TrainingInputMode": "File"
}

# The IAM role with all the permissions given to Sagemaker
training_params["RoleArn"] = roleArn

# Here Sagemaker will store the final trained model
training_params["OutputDataConfig"] = {
    "S3OutputPath": 's3://{}/{}'.format(sagemaker_session.default_bucket(), model_prefix)
}

# This is the config of the instance that will execute the training
training_params["ResourceConfig"] = {
    "InstanceCount": 1,
    "InstanceType": "ml.m4.xlarge",
    "VolumeSizeInGB": 30
}

# The job name. You'll see this name in the Jobs section of the Sagemaker's console
training_params["TrainingJobName"] = job_name

for i in hyperparameters:
    hyperparameters[i] = str(hyperparameters[i])
    
# Here you will configure the hyperparameters used for training your model.
training_params["HyperParameters"] = hyperparameters

# Training timeout
training_params["StoppingCondition"] = {
    "MaxRuntimeInSeconds": 360000
}

# The algorithm currently only supports fullyreplicated model (where data is copied onto each machine)
training_params["InputDataConfig"] = []

# Please notice that we're using application/x-recordio for both 
# training and validation datasets, given our dataset is formated in RecordIO

# Here we set training dataset
training_params["InputDataConfig"].append({
    "ChannelName": "train",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/train'.format(sagemaker_session.default_bucket(), model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})
training_params["InputDataConfig"].append({
    "ChannelName": "validation",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/validation'.format(sagemaker_session.default_bucket(), model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})
training_params["Tags"] = []

In [None]:
# Setting up the deployment config

In [None]:
deployment_params = {
    "EndpointPrefix": model_prefix,
    "DevelopmentEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/dev'.format(sagemaker_session.default_bucket(), model_prefix),
        # we don't want to enable A/B tests in development
        "ABTests": False,
        # we'll use a basic instance for testing purposes
        "InstanceType": "ml.t2.large",
        "InitialInstanceCount": 1,
        # we don't want high availability/escalability for development
        "AutoScaling": None
    },
    "ProductionEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/prd'.format(sagemaker_session.default_bucket(), model_prefix),
        # we want to do A/B tests in production
        "ABTests": True,
        # we'll use a better instance for production. CPU optimized
        "InstanceType": "ml.c5.large",
        "InitialInstanceCount": 2,
        "InitialVariantWeight": 0.1,
        # we want elasticity. at minimum 2 instances to support the endpoint and at maximum 10
        # we'll use a threshold of 750 predictions per instance to start adding new instances or remove them
        "AutoScaling": {
            "MinCapacity": 2,
            "MaxCapacity": 10,
            "TargetValue": 200.0,
            "ScaleInCooldown": 30,
            "ScaleOutCooldown": 60,
            "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance"
        }
    }
}

In [None]:
# Prepping and uploading the dataset

In [None]:
import numpy as np
import sagemaker
from sklearn import datasets
from sklearn.model_selection import train_test_split

sagemaker_session = sagemaker.Session()
iris = datasets.load_iris()

X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.33, random_state=42, stratify=iris.target)
np.savetxt("iris_train.csv", np.column_stack((y_train, X_train)), delimiter=",", fmt='%0.3f')
np.savetxt("iris_test.csv", np.column_stack((y_test, X_test)), delimiter=",", fmt='%0.3f')

# Upload the dataset to an S3 bucket
input_train = sagemaker_session.upload_data(path='iris_train.csv', key_prefix='%s/input/train' % model_prefix)
input_test = sagemaker_session.upload_data(path='iris_test.csv', key_prefix='%s/input/validation' % model_prefix)

# And finally, starting the pipeline...

You can check the status of your pipeline from the CodePipeline console.

In [None]:
import boto3
import io
import zipfile
import json

s3 = boto3.client('s3')
sts_client = boto3.client("sts")

session = boto3.session.Session()

account_id = sts_client.get_caller_identity()["Account"]
region = session.region_name

bucket_name = "mlops-%s-%s" % (region, account_id)
key_name = "training_jobs/%s/trainingjob.zip" % model_prefix

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('trainingjob.json', json.dumps(training_params))
    zf.writestr('deployment.json', json.dumps(deployment_params))
zip_buffer.seek(0)

s3.put_object(Bucket=bucket_name, Key=key_name, Body=bytearray(zip_buffer.read()))

# If everything went well, you can test the endpoint with some dummy payload.

In [None]:
from sagemaker.serializers import CSVSerializer
csv_serializer = CSVSerializer()
def test_endpoint(endpoint_name, payload):
    resp = sm.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='text/csv',
        Accept='text/csv',
        Body=csv_serializer.serialize(payload)
    )
    variant_name = resp['ResponseMetadata']['HTTPHeaders']['x-amzn-invoked-production-variant']
    return float(resp['Body'].read().decode('utf-8').strip()), variant_name

In [None]:
codepipeline = boto3.client('codepipeline')
sm = boto3.client('sagemaker-runtime')

model_prefix='iris-model'
pipeline_name = 'iris-model-pipeline'
endpoint_name_mask='{}-%s'.format(model_prefix)

In [None]:
%%time
payload = [4.6, 3.1, 1.5, 0.2]

print( "DSV")
print( "Classifier: %s, Variant Name: %s" % test_endpoint( endpoint_name_mask % ('development'), payload ) )

print( "\nPRD")
print( "Classifier: %s, Variant Name: %s" % test_endpoint( endpoint_name_mask % ('production'), payload ) )