In [1]:
# Install required packages (Kubeflow Pipelines and Katib SDK).
!pip install kfp
!pip install kubeflow-katib



In [2]:
import json
import kfp
import kfp.dsl as dsl
from kfp import components
from kfp.components import func_to_container_op
from kfp.onprem import use_k8s_secret

from kubeflow.katib import ApiClient
from kubeflow.katib import V1beta1ExperimentSpec
from kubeflow.katib import V1beta1AlgorithmSpec
from kubeflow.katib import V1beta1ObjectiveSpec
from kubeflow.katib import V1beta1ParameterSpec
from kubeflow.katib import V1beta1FeasibleSpace
from kubeflow.katib import V1beta1TrialTemplate
from kubeflow.katib import V1beta1TrialParameterSpec

In [3]:
web_downloader_op = kfp.components.load_component_from_url(
            'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml')

In [4]:
def clean_data(
    bucket, key,
    input_file: components.InputPath(str),
) -> str:
    import pandas as pd
    import boto3
    import os
    
    from io import StringIO
    from scipy.io.arff import loadarff 

    print(f"Loading input file {input_file}")
    
    #convert to dataframe arff format
    raw_data = loadarff(input_file)
    df_data = pd.DataFrame(raw_data[0].copy())
    print(f"Loaded data file of shape {df_data.shape}")
    
    #convert target column to numeric
    df_data.iloc[:, -1] = pd.get_dummies(df_data['CHURN']).iloc[:, 0]
    
    #remove missing values
    df_clean=df_data.dropna(axis=1)
    df_clean.loc[:,'CHURN']=pd.get_dummies(df_data['CHURN']).iloc[:, 0]
    
    #get rid of non-numeric columns
    df_clean=df_clean.select_dtypes(exclude='object')
    
    print(f"testingenv {os.getenv('AWS_SECRET_ACCESS_KEY')}")
    
    csv_buffer = StringIO()
    df_clean.to_csv(csv_buffer)
    s3_resource = boto3.resource('s3', endpoint_url='http://minio.kubeflow.svc.cluster.local:9000',
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'))
    print(f"Saving CSV of shape {df_clean.shape} to s3")
    s3_resource.Object(bucket, key).put(Body=csv_buffer.getvalue())
    # outpuit_file = f"{bucket}:{key}"
    return "Done"

In [5]:
clean_data_op = components.create_component_from_func(clean_data, "clean_data.yaml", packages_to_install=["pandas==1.2.4", "scipy==1.7.0", "boto3"])

In [6]:
# You should define the Experiment name, namespace and number of training steps in the arguments.
def create_katib_experiment_task(experiment_name, experiment_namespace, bucket, key):
    # Trial count specification.
    max_trial_count = 5
    max_failed_trial_count = 3
    parallel_trial_count = 2

    # Objective specification.
    objective = V1beta1ObjectiveSpec(
        type="maximize",
        goal=0.95,
        objective_metric_name="accuracy"
    )

    # Algorithm specification.
    algorithm = V1beta1AlgorithmSpec(
        algorithm_name="random",
    )

    # Experiment search space.
    # In this example we tune number of epochs.
    parameters = [
        V1beta1ParameterSpec(
            name="epochs",
            parameter_type="int",
            feasible_space=V1beta1FeasibleSpace(
                min="5",
                max="10"
            ),
        )
    ]

    # Experiment Trial template.

    trial_spec = {
        "apiVersion": "kubeflow.org/v1",
        "kind": "TFJob",
        "spec": {
            "tfReplicaSpecs": {
                "Chief": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "metadata": {
                            "annotations": {
                                "sidecar.istio.io/inject": "false"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "tensorflow",
                                    "image": "docker.io/misohu/kubeflow-training:latest",
                                    "command": [
                                        "python",
                                        "/opt/model.py",
                                        "--s3-storage=true",
                                        "--epochs=${trialParameters.epochs}",
                                        f"--bucket={bucket}",
                                        f"--bucket-key={key}",
                                    ],
                                    "envFrom": [
                                      {
                                        "secretRef": {
                                          "name": "mlpipeline-minio-artifact"
                                        }
                                      }
                                    ]
                                }
                            ]
                        }
                    }
                },
                "Worker": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "metadata": {
                            "annotations": {
                                "sidecar.istio.io/inject": "false"
                            }
                        },
                        "spec": {
                            "containers": [
                                {
                                    "name": "tensorflow",
                                    "image": "docker.io/misohu/kubeflow-training:latest",
                                    "command": [
                                        "python",
                                        "/opt/model.py",
                                        f"--s3-storage=true",
                                        "--epochs=${trialParameters.epochs}",
                                        f"--bucket={bucket}",
                                        f"--bucket-key={key}",
                                    ],
                                    "envFrom": [
                                      {
                                        "secretRef": {
                                          "name": "mlpipeline-minio-artifact"
                                        }
                                      }
                                    ]
                                }
                            ]
                        }
                    }
                }
            }
        }
    }

    # Configure parameters for the Trial template.
    trial_template = V1beta1TrialTemplate(
        primary_container_name="tensorflow",
        trial_parameters=[
            V1beta1TrialParameterSpec(
                name="epochs",
                description="Learning rate for the training model",
                reference="epochs"
            )
        ],
        trial_spec=trial_spec
    )

    # Create an Experiment from the above parameters.
    experiment_spec = V1beta1ExperimentSpec(
        max_trial_count=max_trial_count,
        max_failed_trial_count=max_failed_trial_count,
        parallel_trial_count=parallel_trial_count,
        objective=objective,
        algorithm=algorithm,
        parameters=parameters,
        trial_template=trial_template
    )

    # Create the KFP task for the Katib Experiment.
    # Experiment Spec should be serialized to a valid Kubernetes object.
    katib_experiment_launcher_op = components.load_component_from_url(
        "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml")
    op = katib_experiment_launcher_op(
        experiment_name=experiment_name,
        experiment_namespace=experiment_namespace,
        experiment_spec=ApiClient().sanitize_for_serialization(experiment_spec),
        experiment_timeout_minutes=60,
        delete_finished_experiment=False)

    return op

In [7]:
# This function converts Katib Experiment HP results to args.
def convert_katib_results(katib_results) -> str:
    import json
    import pprint
    katib_results_json = json.loads(katib_results)
    print("Katib results:")
    pprint.pprint(katib_results_json)
    best_hps = []
    for pa in katib_results_json["currentOptimalTrial"]["parameterAssignments"]:
        if pa["name"] == "epochs":
            best_hps.append("--epochs=" + pa["value"])
    print("Best Hyperparameters: {}".format(best_hps))
    return " ".join(best_hps)

In [8]:
def create_tfjob_task(tfjob_name, tfjob_namespace, katib_op, bucket, key):
    convert_katib_results_op = components.func_to_container_op(convert_katib_results)
    best_hp_op = convert_katib_results_op(katib_op.output)
    best_hps = str(best_hp_op.output)
    print(best_hps)
    
    tfjob_chief_spec = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "tensorflow",
                        "image": "docker.io/misohu/kubeflow-training:latest",
                        "command": [
                            "python",
                            "/opt/model.py",
                            "--s3-storage=true",
                            f"${best_hps}",
                            "--mlflow-model-name=michal-is-amazing",
                            f"--bucket={bucket}",
                            f"--bucket-key={key}",
                        ],
                        "envFrom": [
                          {
                            "secretRef": {
                              "name": "mlpipeline-minio-artifact"
                            }
                          }
                        ]
                    }
                ]
            }
        }
    }
            
    tfjob_worker_spec = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "tensorflow",
                        "image": "docker.io/misohu/kubeflow-training:latest",
                        "command": [
                            "python",
                            "/opt/model.py",
                            "--s3-storage=true",
                            f"${best_hps}",
                            f"--bucket={bucket}",
                            f"--bucket-key={key}",
                        ],
                        "envFrom": [
                          {
                            "secretRef": {
                              "name": "mlpipeline-minio-artifact"
                            }
                          }
                        ]
                    }
                ]
            }
        }
    }
    
    # Create the KFP task for the TFJob.
    tfjob_launcher_op = components.load_component_from_url(
        "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/launcher/component.yaml")
    op = tfjob_launcher_op(
        name=tfjob_name,
        namespace=tfjob_namespace,
        chief_spec=json.dumps(tfjob_chief_spec),
        worker_spec=json.dumps(tfjob_worker_spec),
        tfjob_timeout_minutes=60,
        delete_finished_tfjob=False)
    return op
    

In [9]:
name="demo-pipline"
namespace="user123" # make sure to use correct namespace
bucket = 'dataset'  # make sure bucket exists bucket will be used for preprocessed training data
key = 'data.csv'    # key for preprocessed data

@dsl.pipeline(
    name="Kubeflow Katib MlFlow integration",
    description="Demo pipeline"
)
def demo_pipeline(name=name, namespace=namespace):
    ingest_data = web_downloader_op(url='https://www.openml.org/data/download/53995/KDDCup09_churn.arff')
    clean_data= (clean_data_op(bucket, key, ingest_data.outputs['data']).apply(use_k8s_secret(secret_name='mlpipeline-minio-artifact', k8s_secret_key_to_env={
                     'accesskey': 'AWS_ACCESS_KEY_ID',
                     'secretkey': 'AWS_SECRET_ACCESS_KEY',
                 })))
    
    # Run the hyperparameter tuning with Katib after cleaning.
    with dsl.Condition(clean_data.output == "Done"):
        katib_op = create_katib_experiment_task(name, namespace, bucket, key)
    
        # Run the training with TFJob.
        tfjob_op = create_tfjob_task(name, namespace, katib_op, bucket, key)

# Run the Kubeflow Pipeline in the user's namespace.
kfp_client=kfp.Client()
run_id = kfp_client.create_run_from_pipeline_func(demo_pipeline, namespace=namespace, arguments={}).run_id
print("Run ID: ", run_id)

{{pipelineparam:op=convert-katib-results;name=Output}}


Run ID:  4f7fb786-ba1c-4389-bd14-4b92bed1f3e7
