In [6]:
import kfp
import kfp.dsl as dsl
from kfp import components

from kubeflow.katib import ApiClient
from kubeflow.katib import V1beta1ExperimentSpec
from kubeflow.katib import V1beta1AlgorithmSpec
from kubeflow.katib import V1beta1ObjectiveSpec
from kubeflow.katib import V1beta1ParameterSpec
from kubeflow.katib import V1beta1FeasibleSpace
from kubeflow.katib import V1beta1TrialTemplate
from kubeflow.katib import V1beta1TrialParameterSpec

In [7]:
# 인수에 실험 이름, 네임스페이스 및 학습 단계 수를 정의해야 합니다.
def create_katib_experiment_task(experiment_name, experiment_namespace, training_steps):
    # trials 횟수를 결정합니다. parallel_trial_count은 병렬로 실행할 trial의 개수입니다. 하드웨어의 리소스와, 정의한 job의 리소스 할당량에 따라 설정합니다.
    max_trial_count = 5
    max_failed_trial_count = 3
    parallel_trial_count = 2

    # 목표를 정의합니다. 여기선 loss가 0.001이 목표입니다.
    objective = V1beta1ObjectiveSpec(
        type="minimize",
        goal=0.001,
        objective_metric_name="loss"
    )

    # 튜닝 알고리즘을 정의합니다. 랜덤은 각 하이퍼 파라미터의 범위 내에서 랜덤하게 인자를 선택합니다.
    algorithm = V1beta1AlgorithmSpec(
        algorithm_name="random",
    )

    # lr과 배치 사이즈를 튜닝합니다. 파라미터 타입으로는 int, double, float, list 등을 사용할 수 있습니다.
    parameters = [
        V1beta1ParameterSpec(
            name="learning_rate",
            parameter_type="double",
            feasible_space=V1beta1FeasibleSpace(
                min="0.01",
                max="0.05"
            ),
        ),
        V1beta1ParameterSpec(
            name="batch_size",
            parameter_type="int",
            feasible_space=V1beta1FeasibleSpace(
                min="80",
                max="100"
            ),
        )
    ]

    # yaml과 같이 trial 스펙을 정의합니다. 
    trial_spec = {
        "apiVersion": "kubeflow.org/v1",
        "kind": "TFJob",
        "spec": {
            "tfReplicaSpecs": {
                "Chief": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "metadata": {
                            "annotations": {
                                "sidecar.istio.io/inject": "false"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "tensorflow",
                                    "image": "docker.io/liuhougangxa/tf-estimator-mnist",
                                    "command": [
                                        "python",
                                        "/opt/model.py",
                                        "--tf-train-steps=" + str(training_steps),
                                        "--tf-learning-rate=${trialParameters.learningRate}",
                                        "--tf-batch-size=${trialParameters.batchSize}"
                                    ]
                                }
                            ]
                        }
                    }
                },
                "Worker": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "metadata": {
                            "annotations": {
                                "sidecar.istio.io/inject": "false"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "tensorflow",
                                    "image": "docker.io/liuhougangxa/tf-estimator-mnist",
                                    "command": [
                                        "python",
                                        "/opt/model.py",
                                        "--tf-train-steps=" + str(training_steps),
                                        "--tf-learning-rate=${trialParameters.learningRate}",
                                        "--tf-batch-size=${trialParameters.batchSize}"
                                    ]
                                }
                            ]
                        }
                    }
                }
            }
        }
    }

    # trial 템플릿에 대한 매개변수를 구성합니다.
    trial_template = V1beta1TrialTemplate(
        primary_container_name="tensorflow",
        trial_parameters=[
            V1beta1TrialParameterSpec(
                name="learningRate",
                description="Learning rate for the training model",
                reference="learning_rate"
            ),
            V1beta1TrialParameterSpec(
                name="batchSize",
                description="Batch size for the model",
                reference="batch_size"
            ),
        ],
        trial_spec=trial_spec
    )

    # 위의 매개변수에서 Experiment을 만듭니다.
    experiment_spec = V1beta1ExperimentSpec(
        max_trial_count=max_trial_count,
        max_failed_trial_count=max_failed_trial_count,
        parallel_trial_count=parallel_trial_count,
        objective=objective,
        algorithm=algorithm,
        parameters=parameters,
        trial_template=trial_template
    )

    # Katib 실험에 대한 KFP 작업을 만듭니다.
    # experiment_spec은 Kubernetes 개체로 직렬화되어야 합니다.
    katib_experiment_launcher_op = components.load_component_from_url(
        "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml")
    op = katib_experiment_launcher_op(
        experiment_name=experiment_name,
        experiment_namespace=experiment_namespace,
        experiment_spec=ApiClient().sanitize_for_serialization(experiment_spec),
        experiment_timeout_minutes=60,
        delete_finished_experiment=False)

    return op

In [8]:
def convert_katib_results(katib_results) -> str:
    import json
    import pprint
    katib_results_json = json.loads(katib_results)
    print("Katib results:")
    pprint.pprint(katib_results_json)
    best_hps = []
    for pa in katib_results_json["currentOptimalTrial"]["parameterAssignments"]:
        if pa["name"] == "learning_rate":
            best_hps.append("--tf-learning-rate=" + pa["value"])
        elif pa["name"] == "batch_size":
            best_hps.append("--tf-batch-size=" + pa["value"])
    print("Best Hyperparameters: {}".format(best_hps))
    return " ".join(best_hps)

In [9]:
# 인수에 TFJob 이름, 네임스페이스, 교육 단계 수, Katib의 출력 및 모델 볼륨 작업을 정의해야 합니다.
def create_tfjob_task(tfjob_name, tfjob_namespace, training_steps, katib_op, model_volume_op):
    import json
    # Katib 실험에서 매개변수를 가져옵니다.
    convert_katib_results_op = components.func_to_container_op(convert_katib_results)
    best_hp_op = convert_katib_results_op(katib_op.output)
    best_hps = str(best_hp_op.output)

    # 최적 하이퍼파라미터로 TFJob Chief 및 Worker 사양을 생성합니다.
    tfjob_chief_spec = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "tensorflow",
                        "image": "docker.io/liuhougangxa/tf-estimator-mnist",
                        "command": [
                            "sh",
                            "-c"
                        ],
                        "args": [
                            "python /opt/model.py --tf-export-dir=/mnt/export --tf-train-steps={} {}".format(training_steps, best_hps)
                        ],
                        "volumeMounts": [
                            {
                                "mountPath": "/mnt/export",
                                "name": "model-volume"
                            }
                        ]
                    }
                ],
                "volumes": [
                    {
                        "name": "model-volume",
                        "persistentVolumeClaim": {
                            "claimName": str(model_volume_op.outputs["name"])
                        }
                    }
                ]
            }
        }
    }

    tfjob_worker_spec = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "tensorflow",
                        "image": "docker.io/liuhougangxa/tf-estimator-mnist",
                        "command": [
                            "sh",
                            "-c",
                        ],
                        "args": [
                            "python /opt/model.py --tf-export-dir=/mnt/export --tf-train-steps={} {}".format(training_steps, best_hps) 
                        ],
                    }
                ],
            }
        }
    }

    tfjob_launcher_op = components.load_component_from_url(
        "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml")
    op = tfjob_launcher_op(
        name=tfjob_name,
        namespace=tfjob_namespace,
        chief_spec=json.dumps(tfjob_chief_spec),
        worker_spec=json.dumps(tfjob_worker_spec),
        tfjob_timeout_minutes=60,
        delete_finished_tfjob=False)
    
    return op

In [10]:
name="mnist-katib"
namespace="kubeflow-user-example-com"
training_steps="200"

@dsl.pipeline(
    name="Katib Pipeline",
    description="An end to end mnist example including hyperparameter tuning, train and inference"
)
def mnist_pipeline(name=name, namespace=namespace, training_steps=training_steps):
    # Katib으로 hyperparameter tuning을 실행합니다.
    katib_op = create_katib_experiment_task(name, namespace, training_steps)

    # 모델을 훈련하고 제공할 볼륨을 생성합니다.(동적 프로비저닝)
    model_volume_op = dsl.VolumeOp(
        name="model-volume",
        resource_name="model-volume",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO
    )

    # TFJob으로 distributive training(분산 훈련)을 실행합니다.
    tfjob_op = create_tfjob_task(name, namespace, training_steps, katib_op, model_volume_op)
    

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(mnist_pipeline, "mnist_katib_pipeline.tar.gz")


# Run the Kubeflow Pipeline in the user's namespace.
"""
kfp_client=kfp.Client()
run_id = kfp_client.create_run_from_pipeline_func(mnist_pipeline, namespace=namespace, arguments={}).run_id
print("Run ID: ", run_id)
"""

'\nkfp_client=kfp.Client()\nrun_id = kfp_client.create_run_from_pipeline_func(mnist_pipeline, namespace=namespace, arguments={}).run_id\nprint("Run ID: ", run_id)\n'

In [11]:
# KFServing
# 9강 내용이니 KFServing을 학습하다 돌아오도록 합니다.
# 인수에서 모델 이름, 네임스페이스, TFJob 및 모델 볼륨 작업의 출력을 정의해야 합니다.
def create_kfserving_task(model_name, model_namespace, tfjob_op, model_volume_op):

    inference_service = '''
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
  name: {}
  namespace: {}
  annotations:
    "sidecar.istio.io/inject": "false"
spec:
  predictor:
    tensorflow:
      storageUri: "pvc://{}/"
'''.format(model_name, model_namespace, str(model_volume_op.outputs["name"]))

    kfserving_launcher_op = components.load_component_from_url(
        'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/kfserving/component.yaml')
    kfserving_launcher_op(action="create", inferenceservice_yaml=inference_service).after(tfjob_op)

In [12]:
name="mnist-katib"
namespace="kubeflow-user-example-com"
training_steps="200"

@dsl.pipeline(
    name="Katib Pipeline",
    description="An end to end mnist example including hyperparameter tuning, train and inference"
)
def mnist_pipeline(name=name, namespace=namespace, training_steps=training_steps):
    # Katib으로 hyperparameter tuning을 실행합니다.
    katib_op = create_katib_experiment_task(name, namespace, training_steps)

    # 모델을 훈련하고 제공할 볼륨을 생성합니다.(동적 프로비저닝)
    model_volume_op = dsl.VolumeOp(
        name="model-volume",
        resource_name="model-volume",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO
    )

    # TFJob으로 distributive training(분산 훈련)을 실행합니다.
    tfjob_op = create_tfjob_task(name, namespace, training_steps, katib_op, model_volume_op)
    
    # KFServing 추론을 만듭니다.
    create_kfserving_task(name, namespace, tfjob_op, model_volume_op)
    

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(mnist_pipeline, "mnist_katib_pipeline_with_serving.tar.gz")


# Run the Kubeflow Pipeline in the user's namespace.
"""
kfp_client=kfp.Client()
run_id = kfp_client.create_run_from_pipeline_func(mnist_pipeline, namespace=namespace, arguments={}).run_id
print("Run ID: ", run_id)
"""

'\nkfp_client=kfp.Client()\nrun_id = kfp_client.create_run_from_pipeline_func(mnist_pipeline, namespace=namespace, arguments={}).run_id\nprint("Run ID: ", run_id)\n'