In [42]:
#!pip install kfp --upgrade
#!which dsl-compile

## Amazon SageMaker Components for Kubeflow Pipelines - script mode
In this example we'll build a Kubeflow pipeline using the SageMaker components. Every component calls a different SageMaker feature to perform the following steps:

1. Hyperparameter optimization 
1. Select best hyperparameters and increase epochs
1. Training model on the best hyperparameters 
1. Create an Amazon SageMaker model
1. Deploy model

In [2]:
import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl
import time, os, json

The full list of supported components can be found below, along with additional information like runtime arguments for each component:
https://github.com/kubeflow/pipelines/tree/master/components/aws/sagemaker      
Now we load the components that for this example, which creates a task factory function. We use this function in our pipeline definition to define the pipeline tasks i.e. container operators.

In [3]:
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/hyperparameter_tuning/component.yaml')
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/deploy/component.yaml')

In [46]:
import sagemaker
import boto3

sess = boto3.Session()
sm   = sess.client('sagemaker')
sagemaker_session = sagemaker.Session(boto_session=sess)

#### Prepare training datasets and upload to Amazon S3
We are using `generate_cifar10_tfrecords.py` to generate training, test and evaluation datasets and upload these to the Sagemaker default bucket. We recommend to first run the pipeline and then dive into the code, while the pipeline is executing.

In [5]:
bucket_name = sagemaker_session.default_bucket()
job_folder      = 'jobs'
dataset_folder  = 'datasets'
local_dataset = 'cifar10'

!python generate_cifar10_tfrecords.py --data-dir {local_dataset}
datasets = sagemaker_session.upload_data(path='cifar10', key_prefix='datasets/cifar10-dataset')

# If dataset is already in S3 use the dataset's path:
# datasets = 's3://{bucket_name}/{dataset_folder}/cifar10-dataset'




Download from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz and extract.
Successfully downloaded cifar-10-python.tar.gz 170498071 bytes.
Generating cifar10/train/train.tfrecords
Generating cifar10/validation/validation.tfrecords
Generating cifar10/eval/eval.tfrecords
Done!


#### Upload training scripts to Amazon S3
We package our training and serving code and upload it to S3. We will pass the `source_s3` parameter to the HPO Tuner and Training Job in our pipeline definition. At the end of the training job the inference code is packaged together with the resulting model stored on S3. 

In [6]:
!tar cvfz sourcedir.tar.gz --exclude=".ipynb*" -C code .
source_s3 = sagemaker_session.upload_data(path='sourcedir.tar.gz', key_prefix='training-scripts')
print('\nUploaded to S3 location:')
print(source_s3)

./
./requirements.txt
./model_def.py
./cifar10-training-sagemaker.py
./inference.py

Uploaded to S3 location:
s3://sagemaker-eu-west-1-487575676995/training-scripts/sourcedir.tar.gz


#### Create a custom pipeline op
We convert a python function to a component using `func_to_container_op` method, which returns a task factory fucntion similar to loading components from URL. The function will run in a basic python container. The operator takes the best parameters from a hyperparameter tuning job and increases the number of epochs for the next training job. We still keep the number of epochs relatively low as to reduce training time.

In [33]:
def update_best_model_hyperparams(hpo_results, best_model_epoch = "20") -> str:
    import json
    r = json.loads(str(hpo_results))
    return json.dumps(dict(r,epochs=best_model_epoch))

get_best_hyp_op = func_to_container_op(update_best_model_hyperparams)

#### Create a pipeline

The following cell contains the pipeline definition. You will have to set the argument for the `role_arn` and change the arguments for `train_image` and `serving_image` at pipeline execution in Kubeflow. You can also set correct default values below. The images will have to be changed to the corresponding ECR registries in Frankfurt.  See the available deep learning container images here: 
https://github.com/aws/deep-learning-containers/blob/master/available_images.md

In [43]:
@dsl.pipeline(
    name='cifar10 hpo train deploy pipeline',
    description='cifar10 hpo train deploy pipeline using sagemaker'
)
def cifar10_hpo_train_deploy(region='eu-central-1',
                             role_arn='',
                           training_input_mode='File',
                           train_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-training:1.15.2-gpu-py36-cu100-ubuntu18.04',
                           serving_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:1.15.2-cpu',
                           volume_size='50',
                           max_run_time='86400',
                           instance_type='ml.p3.2xlarge',
                           network_isolation='False',
                           traffic_encryption='False',
                           spot_instance='False',
                           channels='[ \
                    { \
                        "ChannelName": "train", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/train", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "validation", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/validation", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "eval", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "'+datasets+'/eval", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    } \
                ]'
                          ):
    # Component 1
    hpo = sagemaker_hpo_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        strategy='Bayesian',
        metric_name='val_acc',
        metric_definitions='{"val_acc": "val_acc: ([0-9\\\\.]+)"}',
        metric_type='Maximize',
        static_parameters='{ \
            "epochs": "10", \
            "momentum": "0.9", \
            "weight-decay": "0.0002", \
            "model_dir":"s3://'+bucket_name+'/jobs", \
            "sagemaker_program": "cifar10-training-sagemaker.py", \
            "sagemaker_region": "eu-central-1", \
            "sagemaker_submit_directory": "'+source_s3+'" \
        }',
        continuous_parameters='[ \
            {"Name": "learning-rate", "MinValue": "0.0001", "MaxValue": "0.1", "ScalingType": "Logarithmic"} \
        ]',
        categorical_parameters='[ \
            {"Name": "optimizer", "Values": ["sgd", "adam"]}, \
            {"Name": "batch-size", "Values": ["32", "128", "256"]}, \
            {"Name": "model-type", "Values": ["resnet", "custom"]} \
        ]',
        channels=channels,
        output_location=f's3://{bucket_name}/jobs',
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_num_jobs='4',
        max_parallel_jobs='2',
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role_arn
    )
    
    # Component 2
    training_hyp = get_best_hyp_op(hpo.outputs['best_hyperparameters'])
    
    # Component 3
    training = sagemaker_train_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        hyperparameters=training_hyp.output,
        channels=channels,
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=f's3://{bucket_name}/jobs',
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role_arn,
    )

    # Component 4
    create_model = sagemaker_model_op(
        region=region,
        model_name=training.outputs['job_name'],
        image=serving_image,
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role_arn
    )

    # Component 5
    prediction = sagemaker_deploy_op(
        region=region,
        model_name_1=create_model.output,
        instance_type_1='ml.m5.large'
    )

#### Compile the pipeline definition

In this step we compile the DSL pipeline definition into a zipped workflow YAML and store it locally.

In [41]:
kfp.compiler.Compiler().compile(cifar10_hpo_train_deploy,'sm-hpo-train-deploy-pipeline.zip')

After a brief delay you can download the file `sm-hpo-train-deploy-pipeline.zip` from the Jupyter notebook to your machine. Rightclick it and press download. Then go to the Kubeflow dashboard and upload a new Pipeline definition and execute it.

### Test the deployed endpoint
The pipeline we ran in Kubeflow deployed the model with the best hyper parameter set to a Sagemaker endpoint. Let's test this endpoint with an example image:

In [28]:
import json, boto3, numpy as np
client = boto3.client('runtime.sagemaker')

file_name = '1000_dog.png'
with open(file_name, 'rb') as f:
    payload = f.read()

response = client.invoke_endpoint(EndpointName='Endpoint-20201207144110-H98G', 
                                   ContentType='application/x-image', 
                                   Body=payload)
pred = json.loads(response['Body'].read())['predictions']
labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
for l,p in zip(labels, pred[0]):
    print(l,"{:.4f}".format(p*100))

airplane 0.0000
automobile 0.0000
bird 0.0116
cat 0.6094
deer 0.0010
dog 99.1679
frog 0.2065
horse 0.0035
ship 0.0000
truck 0.0000
