Skip to content

Commit

Permalink
Add SageMaker HPO component and sample usage in a pipeline (#1628)
Browse files Browse the repository at this point in the history
* add HPO component and sample pipeline usage

* Update Dockerfile to include HPO component

* Update docker image used in hpo component

* Update HPO readme, make HPO job name required, allow empty string for int params, reintro some default values

* Resolve a couple todos

* Add Dockerfile for HPO and update docker image used in HPO component

* Add Dockerfile for HPO
  • Loading branch information
carolynwang authored and k8s-ci-robot committed Jul 22, 2019
1 parent 5838e2c commit 2778632
Show file tree
Hide file tree
Showing 8 changed files with 674 additions and 5 deletions.
3 changes: 2 additions & 1 deletion components/aws/sagemaker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certif

RUN easy_install pip

RUN pip install boto3==1.9.130 sagemaker pathlib2
RUN pip install boto3==1.9.169 sagemaker pathlib2 pyyaml==3.12

COPY hyperparameter_tuning/src/hyperparameter_tuning.py .
COPY train/src/train.py .
COPY deploy/src/deploy.py .
COPY model/src/create_model.py .
Expand Down
180 changes: 176 additions & 4 deletions components/aws/sagemaker/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
import string
import random
import json
import yaml
from urlparse import urlparse

import boto3
from botocore.exceptions import ClientError
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri

import logging
logging.getLogger().setLevel(logging.INFO)

def get_client(region=None):
"""Builds a client to the AWS SageMaker API."""
Expand Down Expand Up @@ -53,10 +58,10 @@ def create_training_job(client, image, instance_type, instance_count, volume_siz
"VolumeSizeInGB": volume_size
},
"TrainingJobName": job_name,
"HyperParameters": {
"k": "10",
"feature_dim": "784",
"mini_batch_size": "500"
"HyperParameters": {
"k": "10",
"feature_dim": "784",
"mini_batch_size": "500"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 60 * 60
Expand Down Expand Up @@ -221,5 +226,172 @@ def print_tranformation_job_result(output_location):
results = f.readlines()
print("Sample transform result: {}".format(results[0]))


def create_hyperparameter_tuning_job_request(args):
with open('/app/common/hpo.template.yaml', 'r') as f:
request = yaml.safe_load(f)

built_in_algos = {
'blazingtext': 'blazingtext',
'deepar forecasting': 'forecasting-deepar',
'factorization machines': 'factorization-machines',
'image classification': 'image-classification',
'ip insights': 'ipinsights',
'k-means': 'kmeans',
'k-nearest neighbors': 'knn',
'k-nn': 'knn',
'lda': 'lda',
'linear learner': 'linear-learner',
'neural topic model': 'ntm',
'object2vec': 'object2vec',
'object detection': 'object-detection',
'pca': 'pca',
'random cut forest': 'randomcutforest',
'semantic segmentation': 'semantic-segmentation',
'sequence to sequence': 'seq2seq',
'seq2seq modeling': 'seq2seq',
'xgboost': 'xgboost'
}

### Create a hyperparameter tuning job
request['HyperParameterTuningJobName'] = args['job_name']

request['HyperParameterTuningJobConfig']['Strategy'] = args['strategy']
request['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] = args['metric_type']
request['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['MetricName'] = args['metric_name']
request['HyperParameterTuningJobConfig']['ResourceLimits']['MaxNumberOfTrainingJobs'] = args['max_num_jobs']
request['HyperParameterTuningJobConfig']['ResourceLimits']['MaxParallelTrainingJobs'] = args['max_parallel_jobs']
request['HyperParameterTuningJobConfig']['ParameterRanges']['IntegerParameterRanges'] = args['integer_parameters']
request['HyperParameterTuningJobConfig']['ParameterRanges']['ContinuousParameterRanges'] = args['continuous_parameters']
request['HyperParameterTuningJobConfig']['ParameterRanges']['CategoricalParameterRanges'] = args['categorical_parameters']
request['HyperParameterTuningJobConfig']['TrainingJobEarlyStoppingType'] = args['early_stopping_type']

request['TrainingJobDefinition']['StaticHyperParameters'] = args['static_parameters']
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingInputMode'] = args['training_input_mode']

### Update training image (for BYOC) or algorithm resource name
if not args['image'] and not args['algorithm_name']:
logging.error('Please specify training image or algorithm name.')
raise Exception('Could not create job request')
if args['image'] and args['algorithm_name']:
logging.error('Both image and algorithm name inputted, only one should be specified. Proceeding with image.')

if args['image']:
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = args['image']
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName')
else:
# TODO: determine if users can make custom algorithm resources that have the same name as built-in algorithm names
algo_name = args['algorithm_name'].lower().strip()
if algo_name in built_in_algos.keys():
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = get_image_uri(args['region'], built_in_algos[algo_name])
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName')
# Just to give the user more leeway for built-in algorithm name inputs
elif algo_name in built_in_algos.values():
request['TrainingJobDefinition']['AlgorithmSpecification']['TrainingImage'] = get_image_uri(args['region'], algo_name)
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('AlgorithmName')
else:
request['TrainingJobDefinition']['AlgorithmSpecification']['AlgorithmName'] = args['algorithm_name']
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('TrainingImage')

### Update metric definitions
if args['metric_definitions']:
for key, val in args['metric_definitions'].items():
request['TrainingJobDefinition']['AlgorithmSpecification']['MetricDefinitions'].append({'Name': key, 'Regex': val})
else:
request['TrainingJobDefinition']['AlgorithmSpecification'].pop('MetricDefinitions')

### Update or pop VPC configs
if args['vpc_security_group_ids'] and args['vpc_subnets']:
request['TrainingJobDefinition']['VpcConfig']['SecurityGroupIds'] = [args['vpc_security_group_ids']]
request['TrainingJobDefinition']['VpcConfig']['Subnets'] = [args['vpc_subnets']]
else:
request['TrainingJobDefinition'].pop('VpcConfig')

### Update input channels, must have at least one specified
if len(args['channels']) > 0:
request['TrainingJobDefinition']['InputDataConfig'] = args['channels']
else:
logging.error("Must specify at least one input channel.")
raise Exception('Could not make job request')

request['TrainingJobDefinition']['OutputDataConfig']['S3OutputPath'] = args['output_location']
request['TrainingJobDefinition']['OutputDataConfig']['KmsKeyId'] = args['output_encryption_key']
request['TrainingJobDefinition']['ResourceConfig']['InstanceType'] = args['instance_type']
request['TrainingJobDefinition']['ResourceConfig']['VolumeKmsKeyId'] = args['resource_encryption_key']
request['TrainingJobDefinition']['EnableNetworkIsolation'] = args['network_isolation']
request['TrainingJobDefinition']['EnableInterContainerTrafficEncryption'] = args['traffic_encryption']
request['TrainingJobDefinition']['RoleArn'] = args['role']

### Update InstanceCount, VolumeSizeInGB, and MaxRuntimeInSeconds if input is non-empty and > 0, otherwise use default values
if args['instance_count']:
request['TrainingJobDefinition']['ResourceConfig']['InstanceCount'] = args['instance_count']

if args['volume_size']:
request['TrainingJobDefinition']['ResourceConfig']['VolumeSizeInGB'] = args['volume_size']

if args['max_run_time']:
request['TrainingJobDefinition']['StoppingCondition']['MaxRuntimeInSeconds'] = args['max_run_time']

### Update or pop warm start configs
if args['warm_start_type'] and args['parent_hpo_jobs']:
request['WarmStartConfig']['WarmStartType'] = args['warm_start_type']
parent_jobs = [n.strip() for n in args['parent_hpo_jobs'].split(',')]
for i in range(len(parent_jobs)):
request['WarmStartConfig']['ParentHyperParameterTuningJobs'].append({'HyperParameterTuningJobName': parent_jobs[i]})
else:
if args['warm_start_type'] or args['parent_hpo_jobs']:
if not args['warm_start_type']:
logging.error('Must specify warm start type as either "IdenticalDataAndAlgorithm" or "TransferLearning".')
if not args['parent_hpo_jobs']:
logging.error("Must specify at least one parent hyperparameter tuning job")
raise Exception('Could not make job request')
request.pop('WarmStartConfig')

### Update tags
for key, val in args['tags'].items():
request['Tags'].append({'Key': key, 'Value': val})

return request


def create_hyperparameter_tuning_job(client, args):
"""Create a Sagemaker HPO job"""
request = create_hyperparameter_tuning_job_request(args)
try:
job_arn = client.create_hyper_parameter_tuning_job(**request)
hpo_job_name = request['HyperParameterTuningJobName']
logging.info("Created Hyperparameter Training Job with name: " + hpo_job_name)
logging.info("URL: https://{}.console.aws.amazon.com/sagemaker/home?region={}#/hyper-tuning-jobs/{}".format(args['region'], args['region'], hpo_job_name))
return hpo_job_name
except ClientError as e:
raise Exception(e.response['Error']['Message'])


def wait_for_hyperparameter_training_job(client, hpo_job_name):
### Wait until the job finishes
while(True):
response = client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=hpo_job_name)
status = response['HyperParameterTuningJobStatus']
if status == 'Completed':
logging.info("Hyperparameter tuning job ended with status: " + status)
break
if status == 'Failed':
message = response['FailureReason']
logging.error('Hyperparameter tuning failed with the following error: {}'.format(message))
raise Exception('Hyperparameter tuning job failed')
logging.info("Hyperparameter tuning job is still in status: " + status)
time.sleep(30)


def get_best_training_job_and_hyperparameters(client, hpo_job_name):
### Get and return best training job and its hyperparameters, without the objective metric
info = client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=hpo_job_name)
best_job = info['BestTrainingJob']['TrainingJobName']
training_info = client.describe_training_job(TrainingJobName=best_job)
train_hyperparameters = training_info['HyperParameters']
train_hyperparameters.pop('_tuning_objective_metric')
return best_job, train_hyperparameters


def id_generator(size=4, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
42 changes: 42 additions & 0 deletions components/aws/sagemaker/common/hpo.template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
HyperParameterTuningJobName: ''
HyperParameterTuningJobConfig:
Strategy: ''
HyperParameterTuningJobObjective:
Type: ''
MetricName: ''
ResourceLimits:
MaxNumberOfTrainingJobs: 0
MaxParallelTrainingJobs: 0
ParameterRanges:
IntegerParameterRanges: []
ContinuousParameterRanges: []
CategoricalParameterRanges: []
TrainingJobEarlyStoppingType: ''
TrainingJobDefinition:
StaticHyperParameters: {}
AlgorithmSpecification:
TrainingImage: ''
TrainingInputMode: ''
AlgorithmName: ''
MetricDefinitions: []
RoleArn: ''
InputDataConfig: []
VpcConfig:
SecurityGroupIds: []
Subnets: []
OutputDataConfig:
KmsKeyId: ''
S3OutputPath: ''
ResourceConfig:
InstanceType: ''
InstanceCount: 1
VolumeSizeInGB: 1
VolumeKmsKeyId: ''
StoppingCondition:
MaxRuntimeInSeconds: 86400
EnableNetworkIsolation: True
EnableInterContainerTrafficEncryption: False
WarmStartConfig:
ParentHyperParameterTuningJobs: []
WarmStartType: ''
Tags: []
28 changes: 28 additions & 0 deletions components/aws/sagemaker/hyperparameter_tuning/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


FROM ubuntu:16.04

RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip

RUN easy_install pip

RUN pip install boto3==1.9.169 sagemaker pathlib2 pyyaml==3.12

COPY hyperparameter_tuning/src/hyperparameter_tuning.py .

COPY common /app/common/

ENV PYTHONPATH /app

ENTRYPOINT [ "bash" ]
77 changes: 77 additions & 0 deletions components/aws/sagemaker/hyperparameter_tuning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# SageMaker hyperparameter optimization Kubeflow Pipeline component
## Summary
Component to submit hyperparameter tuning jobs to SageMaker directly from a Kubeflow Pipelines workflow.

# Details

## Intended Use
For hyperparameter tuning jobs using AWS SageMaker.

## Runtime Arguments
Argument | Description | Optional | Data type | Accepted values | Default |
:--- | :---------- | :----------| :----------| :---------- | :----------|
region | The region where the cluster launches | No | String | | |
job_name | The name of the tuning job. Must be unique within the same AWS account and AWS region | Yes | String | | |
image | The registry path of the Docker image that contains the training algorithm | No | String | | |
role | The Amazon Resource Name (ARN) that Amazon SageMaker assumes to perform tasks on your behalf | No | String | | |
algorithm_name | The name of the algorithm resource to use for the hyperparameter tuning job; only specify this parameter if training image is not specified | Yes | String | | |
training_input_mode | The input mode that the algorithm supports | No | String | File, Pipe | File |
metric_definitions | The dictionary of name-regex pairs specify the metrics that the algorithm emits | Yes | Dict | | {} |
strategy | How hyperparameter tuning chooses the combinations of hyperparameter values to use for the training job it launches | No | String | Bayesian, Random | Bayesian |
metric_name | The name of the metric to use for the objective metric | No | String | | |
metric_type | Whether to minimize or maximize the objective metric | No | String | Maximize, Minimize | |
early_stopping_type | Whether to minimize or maximize the objective metric | No | String | Off, Auto | Off |
static_parameters | The values of hyperparameters that do not change for the tuning job | Yes | Dict | | {} |
integer_parameters | The array of IntegerParameterRange objects that specify ranges of integer hyperparameters that you want to search | Yes | List of Dicts | | [] |
continuous_parameters | The array of ContinuousParameterRange objects that specify ranges of continuous hyperparameters that you want to search | Yes | List of Dicts | | [] |
categorical_parameters | The array of CategoricalParameterRange objects that specify ranges of categorical hyperparameters that you want to search | Yes | List of Dicts | | [] |
channels | A list of dicts specifying the input channels (at least one); refer to documentation for parameters | No | List of Dicts | | [{}] |
output_location | The Amazon S3 path where you want Amazon SageMaker to store the results of the transform job | No | String | | |
output_encryption_key | The AWS KMS key that Amazon SageMaker uses to encrypt the model artifacts | Yes | String | | |
instance_type | The ML compute instance type | No | String | ml.m4.xlarge, ml.m4.2xlarge, ml.m4.4xlarge, ml.m4.10xlarge, ml.m4.16xlarge, ml.m5.large, ml.m5.xlarge, ml.m5.2xlarge, ml.m5.4xlarge, ml.m5.12xlarge, ml.m5.24xlarge, ml.c4.xlarge, ml.c4.2xlarge, ml.c4.4xlarge, ml.c4.8xlarge, ml.p2.xlarge, ml.p2.8xlarge, ml.p2.16xlarge, ml.p3.2xlarge, ml.p3.8xlarge, ml.p3.16xlarge, ml.c5.xlarge, ml.c5.2xlarge, ml.c5.4xlarge, ml.c5.9xlarge, ml.c5.18xlarge | ml.m4.xlarge |
instance_count | The number of ML compute instances to use in each training job | No | Int | ≥ 1 | 1 |
volume_size | The size of the ML storage volume that you want to provision in GB | No | Int | ≥ 1 | 1 |
max_num_jobs | The maximum number of training jobs that a hyperparameter tuning job can launch | No | Int | [1, 500] | |
max_parallel_jobs | The maximum number of concurrent training jobs that a hyperparameter tuning job can launch | No | Int | [1, 10] | |
max_run_time | The maximum run time in seconds per training job | No | Int | ≤ 432000 (5 days) | 86400 (1 day) |
resource_encryption_key | The AWS KMS key that Amazon SageMaker uses to encrypt data on the storage volume attached to the ML compute instance(s) | Yes | String | | |
vpc_security_group_ids | The VPC security group IDs, in the form sg-xxxxxxxx | Yes | String | | |
vpc_subnets | The ID of the subnets in the VPC to which you want to connect your hpo job | Yes | String | | |
network_isolation | Isolates the training container if true | Yes | Boolean | False, True | True |
traffic_encryption | Encrypts all communications between ML compute instances in distributed training if true | Yes | Boolean | False, True | False |
warm_start_type | Specifies the type of warm start used | Yes | String | IdenticalDataAndAlgorithm, TransferLearning | |
parent_hpo_jobs | List of previously completed or stopped hyperparameter tuning jobs to be used as a starting point | Yes | String | Yes | | |
tags | Key-value pairs to categorize AWS resources | Yes | Dict | | {} |

## Outputs
Name | Description
:--- | :----------
model_artifact_url | URL where model artifacts were stored
best_job_name | Best hyperparameter tuning training job name
best_hyperparameters | Tuned hyperparameters

# Requirements
* [Kubeflow pipelines SDK](https://www.kubeflow.org/docs/pipelines/sdk/install-sdk/)
* [Kubeflow set-up](https://www.kubeflow.org/docs/aws/deploy/install-kubeflow/)

# Samples
## On its own
K-Means algorithm tuning on MNIST dataset: /kubeflow/pipelines/samples/aws-samples/mnist-kmeans-sagemaker/kmeans-hpo-pipeline.py

Follow the same steps as in the [README](https://github.com/kubeflow/pipelines/blob/master/samples/aws-samples/mnist-kmeans-sagemaker/README.md) for the [MNIST classification pipeline](https://github.com/kubeflow/pipelines/blob/master/samples/aws-samples/mnist-kmeans-sagemaker/mnist-classification-pipeline.py):
1. Get and store data in S3 buckets
2. Prepare an IAM roles with permissions to run SageMaker jobs
3. Add 'aws-secret' to your kubeflow namespace
4. Compile the pipeline:
```bash
dsl-compile --py kmeans-hpo-pipeline.py --output kmeans-hpo-pipeline.tar.gz
```
5. In the Kubeflow UI, upload the compiled pipeline specification (the .tar.gz file) and create a new run. Update the role_arn and the data paths, and optionally any other run parameters.
6. Once the pipeline completes, you can see the outputs under 'Output parameters' in the HPO component's Input/Output section.

## Integrated into a pipeline
MNIST Classification using K-Means pipeline: [Coming Soon]

# Resources
* [Using Amazon built-in algorithms](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-algo-docker-registry-paths.html)
* [More information on request parameters](https://github.com/awsdocs/amazon-sagemaker-developer-guide/blob/master/doc_source/API_CreateHyperParameterTuningJob.md#request-parameters)

0 comments on commit 2778632

Please sign in to comment.