In [2]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from os import path
import json
import yaml

In [4]:
cs = comp.ComponentStore()
component_path = path.join('.', 'steps')
cs.local_search_paths.append(component_path)

# Pre-built component
caip_train_op = comp.load_component_from_url(
            'https://raw.githubusercontent.com/kubeflow/pipelines/1.0.0/'
            'components/gcp/ml_engine/train/component.yaml')

# Hand-written component
preprocess_op = cs.load_component('preprocess')

In [20]:
list_example = ["Mango", 1, 3,6, "Oranges"];
type(json.dumps(list_example))

str

In [53]:
from kfp.components import func_to_container_op, InputPath, OutputPath

@func_to_container_op
def get_tuned_param_op(
    hptune_job_id: str,
    project_id: str, 
    common_args: InputPath('List'), 
    tuned_parameters_out: OutputPath('List')):
    import argparse
    from pathlib import Path
    from googleapiclient import discovery
    from googleapiclient import errors
    from types import SimpleNamespace
    import ast


    # Modified from: https://stackoverflow.com/a/54332748
    class NestedNamespace(SimpleNamespace):
        def __init__(self, dictionary, **kwargs):
            super().__init__(**kwargs)
            for key, value in dictionary.items():
                if isinstance(value, dict):
                    self.__setattr__(key, NestedNamespace(value))
                elif isinstance(value, list):
                    self.__setattr__(
                        key,
                        [
                            NestedNamespace(i) if isinstance(i, dict)
                            else i for i in value
                        ]
                    )
                else:
                    self.__setattr__(key, value)


    def print_best_parameters(
        project_id, hp_tune_job, filename='tuned_params', common_args='[]'
    ):
        # Store your full project ID in a variable in the format the API needs.
        job_id = 'projects/{}/jobs/{}'.format(project_id, hp_tune_job)

        # Build a representation of the Cloud ML API.
        ml = discovery.build('ml', 'v1')

        # Create a request to call projects.models.create.
        request = ml.projects().jobs().get(name=job_id)
        # Make the call.
        try:
            response = request.execute()
        except errors.HttpError as err:
            # Something went wrong, print out some information.
            print('There was an error getting the job info, Check the details:')
            print(err._get_reason())

        job_info = NestedNamespace(response)
        param_list = ast.literal_eval(common_args)
        for key,value in job_info.trainingOutput.trials[0].hyperparameters.__dict__.items():
            param_list.append('--'+key)
            param_list.append(value)
        # Creating the directory where the output file will be created (the directory may or may not exist).
        Path(filename).parent.mkdir(parents=True, exist_ok=True)
        with open(filename, 'w') as f:
            f.write(str(param_list))
            
    with open(common_args) as file:
        common_args_str = file.read().replace('\n', '')
    print_best_parameters(project_id, hptune_job_id, tuned_parameters_out, common_args)

In [55]:
# Config parameters
PROJECT_ID = 'pytorch-tpu-nfs'
REGION = 'us-central1'
FAIRSEQ_IMAGE = 'gcr.io/pytorch-tpu-nfs/fairseq-lm-train'

hpt_input_json = './steps/hypertune/config.yaml'
with open(hpt_input_json) as f:
    hpt_input = json.dumps(yaml.safe_load(f)['trainingInput'])

training_input_json = './steps/training/config.yaml'
with open(training_input_json) as f:
    training_input = json.dumps(yaml.safe_load(f)['trainingInput'])

common_args = json.dumps([
        '--task', 'language_modeling',
        '--save-dir', 'checkpoints/transformer_wikitext-103',
        '--arch', 'transformer_lm', '--share-decoder-input-output-embed',
        '--dropout', '0.1',
        '--optimizer', 'adam', '--adam-betas', '(0.9,0.98)',
        '--clip-norm', '0.0',
        '--lr-scheduler', 'inverse_sqrt',
        '--warmup-updates', '4000',
        '--warmup-init-lr', '1e-07',
        '--tokens-per-sample', '512',
        '--sample-break-mode', 'none',
        '--max-tokens', '1024',
        '--update-freq', '16',
        '--fp16',
        '--max-update', '500',
    ])

pipeline_args = {
    'project_id': PROJECT_ID,
    'region': REGION,
    'args': common_args,
    'master_image_uri': FAIRSEQ_IMAGE,
    'training_input': training_input,
    'hpt_input': hpt_input,
    'job_id_prefix': '',
    'job_id': '',
    'wait_interval': '30',
    'dataset_bucket': 'gs://kfp-exp/fairseq-lm-data'
        }


@dsl.pipeline(
        name='KFP-Pipelines Example',
        description='Kubeflow pipeline using pre-built components'
        )
def pipeline(
    project_id=PROJECT_ID,
    region=REGION,
    args=common_args,
    master_image_uri=FAIRSEQ_IMAGE,
    training_input=training_input,
    hpt_input=hpt_input,
    job_id_prefix='',
    job_id='',
    wait_interval='30',
    dataset_bucket='gs://kfp-exp/fairseq-lm-data'
):
    """ Pipeline (DAG) definition """

    preprocess = preprocess_op (
            dataset_bucket=dataset_bucket,
            args_in=args
            )

    hypertune = caip_train_op(
        project_id=project_id,
        region=region,
        args=preprocess.outputs['args_out'],
        master_image_uri=master_image_uri,
        training_input=hpt_input,
        job_id_prefix=job_id_prefix,
        job_id=job_id,
        wait_interval=wait_interval
        ).set_display_name("Hyperparameter-Tuning")
    hypertune.execution_options.caching_strategy.max_cache_staleness = "P0D"

    get_tuned_param = get_tuned_param_op(
           project_id=project_id,
           hptune_job_id=hypertune.outputs['job_id'],
           common_args=preprocess.outputs['args_out']
    ).set_display_name("Get-Tuned-Param")

    train = caip_train_op(
        project_id=project_id,
        region=region,
        args=get_tuned_param.outputs['tuned_parameters_out'],
        master_image_uri=master_image_uri,
        training_input=training_input,
        job_id_prefix=job_id_prefix,
        job_id=job_id,
        wait_interval=wait_interval
        ).set_display_name("Training")


client = kfp.Client(host='https://3fee5ee78d7359c1-dot-us-central2.pipelines.googleusercontent.com')
client.create_run_from_pipeline_func(pipeline, pipeline_args)


RunPipelineResult(run_id=829ea924-f191-42ca-bf1a-6e99bf8f2a2b)