![logo](../imgs/MLU_Logo.png)

---

# EndToEnd MLOps Pipeline

Now, it is time for automating the ML pipeline using the MLOps environment.

First, we will collect the data sets, traning parameters, deploying parameters, and then zip them into a file. This zip file has the following structure:
 - trainingjob.json (Sagemaker training job descriptor)
 - environment.json (instructions to the environment of how to deploy and prepare the endpoints)
 
Then by calling the `s3.put_object` command, we put the zipped file in an S3 bucket. At the same time, [CodePipeline](https://us-west-2.console.aws.amazon.com/codesuite/codepipeline/pipelines/iris-model-pipeline/view?region=us-west-2) monitors that bucket, and will start a job once the zipped file is detected. 


In [7]:
import sagemaker
import boto3
import time
import numpy as np
import sagemaker
from sklearn import datasets
from sklearn.model_selection import train_test_split
import io
import zipfile
import json

## Understanding the MLOps Pipeline

Once the S3 bucket gets the new data, the MLOps pipeline will be kicked off automatically. The pipeline contains the following functions:
- `Source`: action ("putting a `.zip` file in S3") that you want to capture, in order to automaticlly start this CodePipeline
- `ProcessRequest`: AWS Lambda function "[mlops-op-process-request](https://us-west-2.console.aws.amazon.com/lambda/home?region=us-west-2#/functions/mlops-op-process-request?tab=code)" that processes the request and prepares the CloudFormation templates for training;
- `Train`: AWS **CloudFormation** stack to train (see in below picture);
- `DeployDev`: AWS **Cloud Formation** stack to deploy trained model to **Development** environment (see in below picture);
- `DeployApproval`: An **action you need** to take manually in either by clicking `DeployApproval` in Codepipeline or with code approval (shown in next notebook;
- `DeployProd`: an AWS Cloud Formation stack to deploy trained model to **Production** environment (see in below picture);

The overall pipeline will take around **20 minutes** to build. The finished pipeline is shown below (screenshot from CodePipeline):

<img src="../imgs/codepipeline.png" alt="Drawing" style="width: 400px;"/>


 
## UserName
 
The only parameter that you need to fill in the notebook is "**your_name**". It is the "UserName" you filled as a parameter when you setup the Cloudformation pipeline. 

In [8]:
your_alias = "..." # the name you used when creating the CF stacks, e.g. "mia"

## Configuring the Container Image

Let's start defining the hyperparameters and other attributes. Here the hyperparameters are randomly generated for demo purpose, you can leverage Sagemaker Autopilot for HPO (hyperparameters optimizaing). Here is a quick [demo](https://sagemaker-examples.readthedocs.io/en/latest/autopilot/custom-feature-selection/Feature_selection_autopilot.html#Set-up-and-kick-off-autopilot-job) on how to do that.


In [9]:
use_xgboost_builtin=True

sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]
region = boto3.session.Session().region_name
model_prefix='iris-model'
training_image = None
hyperparameters = None

if use_xgboost_builtin: 
    training_image = sagemaker.image_uris.retrieve('xgboost', boto3.Session().region_name, version='1.0-1')
    hyperparameters = {
        "alpha": 0.42495142279951414,
        "eta": 0.4307531922567607,
        "gamma": 1.8028358018081714,
        "max_depth": 10,
        "min_child_weight": 5.925133573560345,
        "num_class": 3,
        "num_round": 30,
        "objective": "multi:softmax",
        "reg_lambda": 10,
        "silent": 0,
    }
else:
    training_image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account_id, region, model_prefix)
    hyperparameters = {
        "max_depth": 11,
        "n_jobs": 5,
        "n_estimators": 120
    }
print(training_image)

246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3


### Defining Parameters

Here, we define the parameters for training and deploying Lambda functions: `mlops-op-training` and `mlops-op-deployment`. You can check the details of functions in [**Lambda**](https://console.aws.amazon.com/lambda/home?region=us-west-2#/functions) console.

#### Defining `training_params`

In [10]:
roleArn = "arn:aws:iam::{}:role/{}".format(account_id, your_alias)
timestamp = time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())

# Specify SageMaker Training job
job_name = model_prefix + timestamp
sagemaker_session = sagemaker.Session()

training_params = {}

# Here we set the reference for the Image Classification Docker image, stored on ECR (https://aws.amazon.com/pt/ecr/)
training_params["AlgorithmSpecification"] = {
    "TrainingImage": training_image,
    "TrainingInputMode": "File"
}

# The IAM role with all the permissions given to Sagemaker
training_params["RoleArn"] = roleArn

# Here Sagemaker will store the final trained model
training_params["OutputDataConfig"] = {
    "S3OutputPath": 's3://{}/{}'.format(sagemaker_session.default_bucket(), model_prefix)
}

# This is the config of the instance that will execute the training
training_params["ResourceConfig"] = {
    "InstanceCount": 1,
    "InstanceType": "ml.m4.xlarge",
    "VolumeSizeInGB": 30
}

# The job name. You'll see this name in the Jobs section of the Sagemaker's console
training_params["TrainingJobName"] = job_name

for i in hyperparameters:
    hyperparameters[i] = str(hyperparameters[i])
    
# Here you will configure the hyperparameters used for training your model.
training_params["HyperParameters"] = hyperparameters

# Training timeout
training_params["StoppingCondition"] = {
    "MaxRuntimeInSeconds": 360000
}

# The algorithm currently only supports fully replicated model (where data is copied onto each machine)
training_params["InputDataConfig"] = []

Here we set training and validation dataset.

In [11]:
# Parameters for training
training_params["InputDataConfig"].append({
    "ChannelName": "train",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/train'.format(
                sagemaker_session.default_bucket(), 
                model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})

# Parameters for tuning
training_params["InputDataConfig"].append({
    "ChannelName": "validation",
    "DataSource": {
        "S3DataSource": {
            "S3DataType": "S3Prefix",
            "S3Uri": 's3://{}/{}/input/validation'.format(
                sagemaker_session.default_bucket(), 
                model_prefix),
            "S3DataDistributionType": "FullyReplicated"
        }
    },
    "ContentType": "text/csv",
    "CompressionType": "None"
})

# Tags
training_params["Tags"] = []

#### Defining `deployment_params`

In [12]:
deployment_params = {
    "EndpointPrefix": model_prefix,
    "DevelopmentEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/dev'.format(
            sagemaker_session.default_bucket(), model_prefix),
        # we don't want to enable A/B tests in development
        "ABTests": False,
        # we'll use a basic instance for testing purposes
        "InstanceType": "ml.m4.xlarge", 
        "InitialInstanceCount": 1,
        # we don't want high availability/escalability for development
        "AutoScaling": None
    },
    "ProductionEndpoint": {
        # we want to enable the endpoint monitoring
        "InferenceMonitoring": True,
        # we will collect 100% of all the requests/predictions
        "InferenceMonitoringSampling": 100,
        "InferenceMonitoringOutputBucket": 's3://{}/{}/monitoring/prd'.format(
            sagemaker_session.default_bucket(), model_prefix),
        # we want to do A/B tests in production
        "ABTests": True,
        # we'll use a better instance for production. CPU optimized
        "InstanceType": "ml.m4.xlarge", # "ml.m4.xlarge", # 
        "InitialInstanceCount": 1,
        "InitialVariantWeight": 0.1,
        # we want elasticity. at minimum 2 instances to support the endpoint and at maximum 10
        # we'll use a threshold of 200 predictions per instance to start adding new instances or remove them
        "AutoScaling": {
            "MinCapacity": 1,
            "MaxCapacity": 10,
            "TargetValue": 200.0,
            "ScaleInCooldown": 30,
            "ScaleOutCooldown": 60,
            "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance"
        }
    }
}

#### Preparing and uploading the dataset

In [13]:
%%time
sagemaker_session = sagemaker.Session()
iris = datasets.load_iris()
prefix='mlops/iris'

X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.33, random_state=42, stratify=iris.target)

np.savetxt("iris_train.csv", np.column_stack((y_train, X_train)), delimiter=",", fmt='%0.3f')
np.savetxt("iris_test.csv", np.column_stack((y_test, X_test)), delimiter=",", fmt='%0.3f')

# Upload the dataset to an S3 bucket
input_train = sagemaker_session.upload_data(path='iris_train.csv', key_prefix='%s/input/train' % model_prefix)
input_test = sagemaker_session.upload_data(path='iris_test.csv', key_prefix='%s/input/validation' % model_prefix)

CPU times: user 174 ms, sys: 8.56 ms, total: 182 ms
Wall time: 574 ms


## Activating the Pipeline 

Alright! Now it's time to start the end-to-end training/deployment process.

In [14]:
s3 = boto3.client('s3')
sts_client = boto3.client("sts")
account_id = sts_client.get_caller_identity()["Account"]

session = boto3.session.Session()
region = session.region_name

bucket_name = "mlops-%s-%s" % (region, account_id)
print("bucket_name : {}".format(bucket_name))
key_name = "training_jobs/%s/trainingjob.zip" % model_prefix
print("key_name : {}".format(key_name))

bucket_name : mlops-us-west-2-578778777738
key_name : training_jobs/iris-model-new/trainingjob.zip


Let's activate our pipeline by putting the training data into the [**S3 bucket**](https://s3.console.aws.amazon.com/s3/buckets/?region=us-west-2).

In [16]:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('trainingjob.json', json.dumps(training_params))
    zf.writestr('deployment.json', json.dumps(deployment_params))
zip_buffer.seek(0)

s3.put_object(Bucket=bucket_name, Key=key_name, Body=bytearray(zip_buffer.read()))

{'ResponseMetadata': {'RequestId': '31E33RXX7F3K7H1J',
  'HostId': '/G13IywwFoXidK0u7SSd79j/k0Z0mveTYaBiEvzKcMznE+5/h8M1jbXPMIjhuvdBJD/+1li6nnI=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': '/G13IywwFoXidK0u7SSd79j/k0Z0mveTYaBiEvzKcMznE+5/h8M1jbXPMIjhuvdBJD/+1li6nnI=',
   'x-amz-request-id': '31E33RXX7F3K7H1J',
   'date': 'Mon, 04 Apr 2022 02:08:21 GMT',
   'x-amz-version-id': 'Z8xzRS_ut1pwCufZXLqBrKkeIB3b4tmf',
   'etag': '"3cf990ef87c5d23472ac63a4cf3435e7"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"3cf990ef87c5d23472ac63a4cf3435e7"',
 'VersionId': 'Z8xzRS_ut1pwCufZXLqBrKkeIB3b4tmf'}

### Building ...

While the pipeline is built automatically, open the [CodePipeline](https://us-west-2.console.aws.amazon.com/codesuite/codepipeline/pipelines/iris-model-pipeline/view?region=us-west-2)  console to see the status of our building pipeline. It will build the codepipeline following `ProcessRequest`, `Train` and `DeployDev`.

After around **20 minutes**, the pipeline is finished.

# Next Lab

Once all steps (`ProcessRequest`, `Train` and `DeployDev`) turn to green. Go to the next notebook to see how to approve the deployment in production and monitor your endpoint.