In [1]:
# !pip install kfp --upgrade
# !which dsl-compile

## Amazon SageMaker Components for Kubeflow Pipelines Example - custom container
In this example we'll build a Kubeflow pipeline where every component call a different Amazon SageMaker feature.
Our simple pipeline will perform:

1. Hyperparameter optimization 
1. Select best hyperparameters and increase epochs
1. Training model on the best hyperparameters 
1. Create an Amazon SageMaker model
1. Deploy model

In [1]:
import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl
from kfp.aws import use_aws_secret
import time, os, json

In [2]:
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/hyperparameter_tuning/component.yaml')
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/train/component.yaml')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/model/component.yaml')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/cb36f87b727df0578f4c1e3fe9c24a30bb59e5a2/components/aws/sagemaker/deploy/component.yaml')

In [3]:
import sagemaker
import boto3

sess = boto3.Session()
sm   = sess.client('sagemaker')
role = sagemaker.get_execution_role()
# role = 'arn:aws:iam::453691756499:role/service-role/AmazonSageMaker-ExecutionRole-20190820T113591'
sagemaker_session = sagemaker.Session(boto_session=sess)

In [4]:
# !python generate_cifar10_tfrecords.py --data-dir {local_dataset}
# datasets = sagemaker_session.upload_data(path='cifar10', key_prefix='datasets/cifar10-dataset')
datasets = "s3://sagemaker-us-west-2-453691756499/datasets/cifar10-dataset"

In [5]:
bucket_name = sagemaker_session.default_bucket()
job_folder      = 'jobs'
dataset_folder  = 'datasets'

train_path = f'{datasets}/train'
val_path   = f'{datasets}/validation'
eval_path  = f'{datasets}/eval'

In [6]:
def update_best_model_hyperparams(hpo_results, best_model_epoch = "80") -> str:
    import json
    r = json.loads(str(hpo_results))
    return json.dumps(dict(r,epochs=best_model_epoch))

get_best_hyp_op = func_to_container_op(update_best_model_hyperparams)

Hyperparameter optimization -> Select best hyperparameters and increase epochs -> Training model on the best hyperparameters -> Create an Amazon SageMaker model -> Deploy model

In [28]:
@dsl.pipeline(
    name='cifar10 hpo train deploy pipeline',
    description='cifar10 hpo train deploy pipeline using sagemaker'
)
def cifar10_hpo_train_deploy(region='us-west-2',
                           training_input_mode='File',
                           train_image='453691756499.dkr.ecr.us-west-2.amazonaws.com/sagemaker-kubernetes:latest',
                           serving_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:1.15.2-cpu',
                           volume_size='50',
                           max_run_time='86400',
                           instance_type='ml.p3.2xlarge',
                           network_isolation='False',
                           traffic_encryption='False',
                           spot_instance='False',
                           channels='[ \
                    { \
                        "ChannelName": "train", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/train", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "validation", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/validation", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    }, \
                    { \
                        "ChannelName": "eval", \
                        "DataSource": { \
                            "S3DataSource": { \
                                "S3DataType": "S3Prefix", \
                                "S3Uri": "s3://'+bucket_name+'/datasets/cifar10-dataset/eval", \
                                "S3DataDistributionType": "FullyReplicated" \
                            } \
                        }, \
                        "CompressionType": "None", \
                        "RecordWrapperType": "None" \
                    } \
                ]'
                          ):
    # Component 1
    hpo = sagemaker_hpo_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        strategy='Bayesian',
        metric_name='val_acc',
        metric_definitions='{"val_acc": "val_acc: ([0-9\\\\.]+)"}',
        metric_type='Maximize',
        static_parameters='{ \
            "epochs": "1", \
            "momentum": "0.9", \
            "weight-decay": "0.0002", \
            "model_dir":"s3://'+bucket_name+'/jobs", \
            "sagemaker_region": "us-west-2" \
        }',
        continuous_parameters='[ \
            {"Name": "learning-rate", "MinValue": "0.0001", "MaxValue": "0.1", "ScalingType": "Logarithmic"} \
        ]',
        categorical_parameters='[ \
            {"Name": "optimizer", "Values": ["sgd", "adam"]}, \
            {"Name": "batch-size", "Values": ["32", "128", "256"]}, \
            {"Name": "model-type", "Values": ["resnet", "custom"]} \
        ]',
        channels=channels,
        output_location=f's3://{bucket_name}/jobs',
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_num_jobs='1',
        max_parallel_jobs='1',
        max_run_time=max_run_time,
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))
    
    # Component 2
    training_hyp = get_best_hyp_op(hpo.outputs['best_hyperparameters'])
    
    # Component 3
    training = sagemaker_train_op(
        region=region,
        image=train_image,
        training_input_mode=training_input_mode,
        hyperparameters=training_hyp.output,
        channels=channels,
        instance_type=instance_type,
        instance_count='1',
        volume_size=volume_size,
        max_run_time=max_run_time,
        model_artifact_path=f's3://{bucket_name}/jobs',
        network_isolation=network_isolation,
        traffic_encryption=traffic_encryption,
        spot_instance=spot_instance,
        role=role,
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    # Component 4
    create_model = sagemaker_model_op(
        region=region,
        model_name=training.outputs['job_name'],
        image=serving_image,
        model_artifact_url=training.outputs['model_artifact_url'],
        network_isolation=network_isolation,
        role=role
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

    # Component 5
    prediction = sagemaker_deploy_op(
        region=region,
        model_name_1=create_model.output,
        instance_type_1='ml.m5.large'
    ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

In [29]:
kfp.compiler.Compiler().compile(cifar10_hpo_train_deploy,'sm-hpo-train-deploy-pipeline.zip')

In [30]:
client = kfp.Client()
aws_experiment = client.create_experiment(name='aws')

exp_name    = f'cifar10-hpo-train-deploy-kfp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sm-hpo-train-deploy-pipeline.zip')

In [2]:
import json, boto3
client = boto3.client('runtime.sagemaker')

file_name = '1000_dog.png'
with open(file_name, 'rb') as f:
    payload = f.read()

response = client.invoke_endpoint(EndpointName='Endpoint-20200502070427-8KDX', 
                                   ContentType='application/x-image', 
                                   Body=payload)
print(response['Body'].read())
labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']

b'{\n    "predictions": [[9.40148e-05, 6.55521762e-06, 0.00156780728, 0.196098939, 0.000643287145, 0.794406056, 0.00635553245, 0.000790171616, 9.30105853e-06, 2.84284379e-05]\n    ]\n}'
