In [1]:
import kfp
from kfp.components import InputPath, OutputPath
from kfp import dsl
from typing import List, Tuple
from kfp.dsl import ContainerOp
from kubernetes.client.models import V1EnvVar,V1EnvVarSource, V1SecretKeySelector,V1ConfigMapKeySelector
from typing import NamedTuple

In [2]:
BASE_IMAGE="quay.io/ntlawrence/summary:1.0.15"
PREDICTOR_IMAGE="quay.io/ntlawrence/summary-predictor:1.0.17"

In [3]:
def run_commands(commands: List[str], cwd: str):
    import subprocess

    for command in commands:
        print(command)
        subprocess.run(command, shell=True, cwd=cwd, check=True)


run_commands_comp = kfp.components.create_component_from_func(
    run_commands, base_image=BASE_IMAGE
)

In [4]:
def evaluate_model(model_dir: str,
                   dataset_dir: str,
                   cwd: str) -> NamedTuple("EvaluationOutput", [("mlpipeline_metrics", "Metrics")]):
    import subprocess
    import json
    from collections import namedtuple

    subprocess.run(("python eval.py "
                    f"--prepared_dataset_dir={dataset_dir} "
                    f"--model_dir={model_dir} "
                    f"--results_json=/tmp/results.json "
                   ),
                   shell=True,
                   cwd=cwd,
                   check=True)
    
    with open("/tmp/results.json", "r") as f:
        metrics = json.load(f)
        
    metrics = {
        "metrics": [
            {"name": "rougeL", 
             "numberValue": metrics["eval_rougeL"],
             "format": "RAW"},
            {"name": "rouge2", 
             "numberValue": metrics["eval_rouge2"],
             "format": "RAW"},
        ]
    }
    
    out_tuple = namedtuple("EvaluationOutput", ["mlpipeline_metrics"])
    return out_tuple(json.dumps(metrics))
    
evaluate_model_comp = kfp.components.create_component_from_func(
    func=evaluate_model, base_image=BASE_IMAGE
)

In [5]:
def create_model_archive(model_dir: str,
                         archive: OutputPath(str),
                         model_name: str = "billsum",
                         version: str = "1"):
    import os
    from pathlib import Path
    import tarfile

    os.makedirs(Path(archive).parent.absolute(), exist_ok=True)

    with tarfile.open(name=archive, mode="w:gz") as f:
        for file in Path(model_dir).rglob("*"):
            if not file.is_dir():
                f.add(file.absolute(), arcname=f"{version}/{model_name}/{file.relative_to(model_dir)}")
                
create_model_archive_comp = kfp.components.create_component_from_func(
    func=create_model_archive, base_image=BASE_IMAGE
)

In [6]:
def upload_archive(
    archive: InputPath(str),
    archive_name: str,
    minio_url: str = "minio-service.kubeflow:9000",
    version: str = "1"
) -> NamedTuple("UploadOutput", [("s3_address", str)]):
    """Uploads a model file to MinIO artifact store."""

    from collections import namedtuple
    import logging
    from minio import Minio
    import sys
    import tarfile
    import os

    logging.basicConfig(
        stream=sys.stdout,
        level=logging.INFO,
        format="%(levelname)s %(asctime)s: %(message)s",
    )
    logger = logging.getLogger()


    minio_client = Minio(
            minio_url, 
            access_key=os.environ["MINIO_ID"], 
            secret_key=os.environ["MINIO_PWD"], secure=False
        )

    # Create export bucket if it does not yet exist
    export_bucket="{{workflow.namespace}}"
    existing_bucket = next(filter(lambda bucket: bucket.name == export_bucket, minio_client.list_buckets()), None)

    if not existing_bucket:
        logger.info(f"Creating bucket '{export_bucket}'...")
        minio_client.make_bucket(bucket_name=export_bucket)

    path = f"tar/{version}/{archive_name}"
    s3_address = f"s3://{export_bucket}/tar"

    logger.info(f"Saving tar file to MinIO (s3 address: {s3_address})...")
    minio_client.fput_object(
        bucket_name=export_bucket,  # bucket name in Minio
        object_name=path,  # file name in bucket of Minio 
        file_path=archive,  # file path / name in local system
    )

    logger.info("Finished.")
    out_tuple = namedtuple("UploadOutput", ["s3_address"])
    return out_tuple(s3_address)


upload_archive_comp = kfp.components.create_component_from_func(
    func=upload_archive, base_image=BASE_IMAGE, packages_to_install=["minio==7.1.13"]
)

In [7]:
def deploy_inference_service(name: str,
                             version: int,
                             model_archive_s3: str,
                             predictor_image: str,
                             predictor_max_replicas: int = 1,
                             predictor_min_replicas: int = 1,
                             predictor_concurrency_target: int = None,
                             prefix: str = "",
                             suffix: str = ""
                            ):
    import kserve
    from kubernetes import client, config
    from kubernetes.client import (V1ServiceAccount, 
                                   V1Container, 
                                   V1EnvVar, 
                                   V1ObjectMeta, 
                                   V1ContainerPort, 
                                   V1ObjectReference,
                                   V1ResourceRequirements
                                  )
    from kserve import KServeClient
    from kserve import constants
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1ExplainerSpec
    from kserve import V1beta1TransformerSpec
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1InferenceService
    import json
    from http import HTTPStatus
    import logging
    import yaml
    from time import sleep


    config.load_incluster_config()
    
    
    SERVICE_ACCOUNT = "summary-inference-sa"

    sa = V1ServiceAccount(
        api_version="v1",
        kind="ServiceAccount",
        metadata=V1ObjectMeta(name=SERVICE_ACCOUNT, 
                              namespace="{{workflow.namespace}}"),
        secrets=[V1ObjectReference(name="minio-credentials")]
    )
    corev1 = client.CoreV1Api()
    
    try:
        corev1.create_namespaced_service_account(namespace="{{workflow.namespace}}",
                                                 body=sa)
    except client.exceptions.ApiException as e:
        if e.status == HTTPStatus.CONFLICT:
            corev1.patch_namespaced_service_account(name=SERVICE_ACCOUNT,
                                                    namespace="{{workflow.namespace}}",
                                                    body=sa)
        else:
            raise
    
    if prefix:
        prefix = prefix + " "
    if suffix:
        suffix = " " + suffix
        
    predictor_spec = V1beta1PredictorSpec(
        max_replicas=predictor_max_replicas,
        min_replicas=predictor_min_replicas,
        scale_target=predictor_concurrency_target,
        scale_metric="concurrency",
        containers=[
            V1Container(
                name="kserve-container",
                image=predictor_image,
                args=["python", 
                      "inference_service.py", 
                      f"--model_name={name}", 
                      f"--model_version={version}", 
                      "--num_replicas=1"],

                resources=V1ResourceRequirements(
                    limits={"memory": "50Gi"},
                    requests={"memory": "2Gi"},
                ),
                env=[
                 V1EnvVar(
                     name="STORAGE_URI", value=model_archive_s3
                 ),
                 V1EnvVar(
                     name="PREFIX", value=prefix
                 ),
                 V1EnvVar(
                     name="SUFFIX", value=suffix
                 )
                ],
            )
        ],
        service_account_name=SERVICE_ACCOUNT
    )

    inference_service = V1beta1InferenceService(
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=V1ObjectMeta(name=name, 
                              namespace="{{workflow.namespace}}",
                              annotations={"sidecar.istio.io/inject": "false",
                                           "serving.kserve.io/enable-prometheus-scraping" : "true"}),
        spec=V1beta1InferenceServiceSpec(predictor=predictor_spec)
    )
    # serving.kserve.io/inferenceservice: credit-risk
    logging.info(
        yaml.dump(
            client.ApiClient().sanitize_for_serialization(inference_service)
        )
    )

    # KServeClient doesn't throw ApiException for CONFLICT
    # Using the k8s API directly for the create
    api_instance = client.CustomObjectsApi()
        
    while True:
        try:
            api_instance.create_namespaced_custom_object(
                    group=constants.KSERVE_GROUP,
                    version=inference_service.api_version.split("/")[1],
                    namespace="{{workflow.namespace}}",
                    plural=constants.KSERVE_PLURAL,
                    body=inference_service)
            break
        except client.exceptions.ApiException as api_exception:
            if api_exception.status==HTTPStatus.CONFLICT:
                try:
                    api_instance.delete_namespaced_custom_object(
                        group=constants.KSERVE_GROUP,
                        version=inference_service.api_version.split("/")[1],
                        namespace="{{workflow.namespace}}",
                        plural=constants.KSERVE_PLURAL,
                        name=name)
                    sleep(15)
                except client.exceptions.ApiException as api_exception2:
                    if api_exception2.status in {HTTPStatus.NOT_FOUND, HTTPStatus.GONE}:
                        pass
                    else:
                        raise

            else:
                raise
            
    kclient = KServeClient()
    kclient.wait_isvc_ready(name=name, namespace="{{workflow.namespace}}")
    
    if not kclient.is_isvc_ready(name=name, namespace="{{workflow.namespace}}"):
        raise RuntimeError(f"The inference service {name} is not ready!")

deploy_inference_service_comp = kfp.components.create_component_from_func(
    func=deploy_inference_service, base_image=BASE_IMAGE
)

In [8]:
PIPELINE_NAME = "summarize"

In [9]:
@dsl.pipeline(name=PIPELINE_NAME)
def summarize_pipeline(
    source_repo: str = "https://github.com/ntl-ibm/kubeflow-ppc64le-examples.git",
    source_branch: str = "3.0.0",
    source_context: str = "natural-language-processing/huggingface-summarization/src",
    minio_endpoint="minio-service.kubeflow:9000",
    checkpoint: str="t5-small",
    model_max_length: int = 512,
    model_version: int = 1,
    epochs: int = 3,
    model_name: str = "billsum",
    prefix: str = "summarize: ",
    suffix: str = ""
):
    def mount_volume(task, pvc_name, mount_path, volume_subpath, read_only=False):
        task.add_volume(
            V1Volume(
                name=pvc_name,
                persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(pvc_name),
            )
        )

        task.add_volume_mount(
            V1VolumeMount(
                name=pvc_name,
                mount_path=mount_path,
                sub_path=volume_subpath,
                read_only=read_only,
            )
        )
        
    def env_var_from_secret(env_var_name: str, secret_name: str, secret_key: str) -> V1EnvVar:
        return V1EnvVar(name=env_var_name,
                                     value_from=V1EnvVarSource(
                                         secret_key_ref=V1SecretKeySelector(
                                             name=secret_name,
                                             key=secret_key
                                         )
                                     )
                                    )

    workspace_volume_volop = dsl.VolumeOp(
        name="Create workspace",
        resource_name="shared-workspace-pvc",
        modes=dsl.VOLUME_MODE_RWM,
        size="4Gi",
        set_owner_reference=True,
    )

    clone_repo_task = run_commands_comp(
        [
            f"git clone {source_repo}  /workspace/repo -b {source_branch} || true"
        ],
        "/workspace",
    )
    clone_repo_task.add_pvolumes({"/workspace": workspace_volume_volop.volume})
    clone_repo_task.set_display_name("Clone Repo")
    
    
    preprocess_data_task = run_commands_comp(
        [("python prepare.py " +
           f"--checkpoint={checkpoint} " +
            "--prepared_dataset_dir=/workspace/dataset " +
           f"--model_max_len={model_max_length} " +
           (f"--prefix={prefix} " if prefix else "") +
           (f"--suffix={suffix} " if suffix else "")
         )
        ],
        f"/workspace/repo/{source_context}"
    )
    preprocess_data_task.add_pvolumes({"/workspace": workspace_volume_volop.volume})
    preprocess_data_task.after(clone_repo_task)
    preprocess_data_task.set_display_name("Load and Preprocess data")
    
    train_model_task = run_commands_comp(
        [("python train.py "
           f"--checkpoint={checkpoint} "
            "--prepared_dataset_dir=/workspace/dataset "
            "--model_dir=/workspace/billsum "
            f"--epochs={epochs}"
         )
        ],
        f"/workspace/repo/{source_context}"
    )
    train_model_task.add_pvolumes({"/workspace": workspace_volume_volop.volume})
    train_model_task.after(preprocess_data_task)
    train_model_task.set_display_name("Train Model")
    train_model_task.set_gpu_limit(1)
    train_model_task.set_cpu_limit('1')
    train_model_task.set_memory_request('40G')
    train_model_task.set_memory_limit('1024G')

    evaluate_model_task = evaluate_model_comp(model_dir="/workspace/billsum",
                                              dataset_dir="/workspace/dataset",
                                              cwd=f"/workspace/repo/{source_context}")
    evaluate_model_task.add_pvolumes({"/workspace": workspace_volume_volop.volume})
    evaluate_model_task.after(train_model_task)
    evaluate_model_task.set_display_name("Evaluate Model")
    evaluate_model_task.set_gpu_limit(1)

    create_archive_task = create_model_archive_comp(model_dir=f"/workspace/{model_name}")
    create_archive_task.add_pvolumes({"/workspace": workspace_volume_volop.volume})
    create_archive_task.after(evaluate_model_task)
    create_archive_task.set_display_name("create archive")
    
    upload_archive_task = upload_archive_comp(
        archive = create_archive_task.outputs["archive"],
        archive_name = f"{model_name}.tar"
    )
    upload_archive_task.container.add_env_variable(env_var_from_secret("MINIO_ID", "mlpipeline-minio-artifact", "accesskey"))
    upload_archive_task.container.add_env_variable(env_var_from_secret("MINIO_PWD", "mlpipeline-minio-artifact", "secretkey"))
    upload_archive_task.after(create_archive_task)

    deploy_model_task = deploy_inference_service_comp(name=model_name,
                                                           version=1,
                                                           model_archive_s3=upload_archive_task.outputs["s3_address"],
                                                           predictor_image=PREDICTOR_IMAGE,
                                                          prefix = prefix,
                                                          suffix = suffix,
                                                          predictor_min_replicas=0
                                                          )

In [10]:
pipeline_conf = kfp.dsl.PipelineConf()

# Disable Caching
def disable_cache_transformer(op: dsl.ContainerOp):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


pipeline_conf.add_op_transformer(disable_cache_transformer)


In [11]:
def delete_pipeline(pipeline_name: str):
    """Delete's a pipeline with the specified name"""

    client = kfp.Client()
    existing_pipelines = client.list_pipelines(page_size=999).pipelines
    matches = (
        [ep.id for ep in existing_pipelines if ep.name == pipeline_name]
        if existing_pipelines
        else []
    )
    for id in matches:
        client.delete_pipeline(id)


def get_experiment_id(experiment_name: str) -> str:
    """Returns the id for the experiment, creating the experiment if needed"""
    client = kfp.Client()
    existing_experiments = client.list_experiments(page_size=999).experiments
    matches = (
        [ex.id for ex in existing_experiments if ex.name == experiment_name]
        if existing_experiments
        else []
    )

    if matches:
        return matches[0]

    exp = client.create_experiment(experiment_name)
    return exp.id

In [12]:
PIPELINE_NAME = "summarize"

client = kfp.Client()
kfp.compiler.Compiler().compile(
    pipeline_func=summarize_pipeline,
    package_path=f"{PIPELINE_NAME}.yaml",
    pipeline_conf=pipeline_conf,
)

delete_pipeline(PIPELINE_NAME)
uploaded_pipeline = client.upload_pipeline(f"{PIPELINE_NAME}.yaml", PIPELINE_NAME)
run = client.run_pipeline(
    experiment_id=get_experiment_id("summarize-exp"),
    job_name="summarize",
    pipeline_id=uploaded_pipeline.id,
    params={},
)

In [21]:
import requests
import json

In [32]:
text = "The Inflation Reduction Act lowers prescription drug costs, health care costs, and energy costs.  It's the most aggressive action on tackling the climate crisis in American history, which will lift up American workers and create good-paying, union jobs across the country. It'll lower the deficit and ask the ultra-wealthy and corporations to pay their fair share. And no one making under $400,000 per year will pay a penny more in taxes."
  
import textwrap

for line in textwrap.wrap(text, width=70):
    print(line)

The Inflation Reduction Act lowers prescription drug costs, health
care costs, and energy costs.  It's the most aggressive action on
tackling the climate crisis in American history, which will lift up
American workers and create good-paying, union jobs across the
country. It'll lower the deficit and ask the ultra-wealthy and
corporations to pay their fair share. And no one making under $400,000
per year will pay a penny more in taxes.


In [49]:
r = requests.post("http://billsum.ntl-us-ibm-com.svc.cluster.local/v1/models/billsum:predict",
              json=
                  {"instances" : ["summarize: " + text]}
              )

In [50]:
r.json()

{'summary': 'operate case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case case'}

In [51]:
for line in textwrap.wrap(r.json()["summary"], width=70):
    print(line)

operate case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case case case case case case case case case case case case
case case case
