# Hello World pipeline

In [5]:
from kfp import dsl

@dsl.component(base_image="python:3.11")
def hello_world(name:str) -> str:
    return f"hello world\nnice to meet you {name}"

@dsl.component(base_image="python:3.11")
def print_str(hello_str:str):
    print(hello_str)

In [6]:
from kfp import dsl

@dsl.pipeline(name="hello-world")
def hello_world_pipeline(name:str):
    hello_comp = hello_world(name=name)
    print_comp = print_str(hello_str=hello_comp.output)

In [7]:
from kfp.compiler import Compiler

Compiler().compile(hello_world_pipeline, package_path="hello-world.yaml")

# Dataset processing pipeline

## install kfp-kubernetes

In [9]:
%%sh
pip install -q --no-cache-dir kfp[kubernetes]

## 데이터셋 다운로드 후 검증 컴포넌트

In [2]:
from kfp import dsl
from kfp.dsl import Output, Dataset

@dsl.component(base_image="python:3.11", packages_to_install=["datasets"])
def save_jsonl(
    jsonl_url: str,
    output_dataset: Output[Dataset]
):
    import requests
    from datasets import load_dataset

    jsonl_file = "temp.jsonl"
    # fetch jsonl file and save temp file
    with requests.get(jsonl_url, stream=True) as r:
        r.raise_for_status()
        with open(jsonl_file, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)

    # load dataset from jsonl
    dataset = load_dataset("json", data_files=jsonl_file)
    # save jsonl
    dataset['train'].to_json(output_dataset.path)

: 

In [None]:
from kfp import dsl
from kfp.dsl import Input, Output, Dataset

@dsl.component(base_image="python:3.11", packages_to_install=["datasets"])
def validate_dataset(
    jsonl_dataset: Input[Dataset],
    valid_dataset: Output[Dataset]
):
    from datasets import load_dataset
    
    dataset = load_dataset("json", data_files=jsonl_dataset.path)
    
    # conversational format validation
    def is_chat_template(examples):
        messages = examples["message"]
        result = []
        for message in messages:
            checked = 0
            for item in message:
                if isinstance(item["role"].dtype, str) and isinstance(item["content"], str):
                    checked += 1
            result.append(len(message) == checked and checked > 0)
        return result


    feature_set = dataset['train'].features
    if "messages" in feature_set:
        dataset = dataset.filter(is_chat_template, batched=True).select_columns("messages")
    elif ({"prompt", "completion"}.issubset(feature_set) and 
          feature_set["prompt"].dtype == 'string' and feature_set["completion"].dtype == 'string'):
        dataset = dataset.select_columns(["prompt", "completion"])
    else:
        raise ValueError("data should be conversational format or instruction format") 
    dataset['train'].to_json(valid_dataset.path)

# Training pipeline

## training code

In [63]:
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["requests"])
def save_train_func(training_code_url:str) -> str:
    import requests

    response = requests.get(training_code_url)
    training_code = response.content.decode('utf-8')

    print(f"training code:\n\n{training_code}")

    return training_code

## deepspeed config 만들기

In [59]:
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["requests"])
def json_to_dict(
    json_url:str,
    extras_to_update:str,
) -> str:   
    import requests
    import json

    headers = {"Content-type": "application/json"}
    response = requests.get(json_url, headers=headers)
    json_data = response.json()

    # add extras
    def recursive_update(original, updates):
        for key, value in updates.items():
            # 둘다 dictionary면 재귀적으로 병합
            if isinstance(value, dict) and key in original and isinstance(original[key], dict):
                recursive_update(original[key], value)
            # 업데이트
            else:
                original[key] = value
    if extras_to_update:
        extras_to_update = json.loads(extras_to_update)
    
    recursive_update(json_data, extras_to_update)

    print(f"json data:\n\n{json.dumps(json_data, indent=4)}")

    return json_data

## PytorchJob 만들기

In [None]:
from kfp import dsl

# disable cache
@dsl.component(base_image="python:3.11")
def generate_job_id() -> str:
    import string
    import random

    ## create job id
    letters_set = string.ascii_lowercase + string.digits
    random_list = random.sample(letters_set,5)
    job_id = ''.join(random_list)

    return job_id

In [None]:
from kfp import dsl
from kfp.dsl import Input, Dataset

@dsl.component(base_image="python:3.11")
def generate_pytorchjob_script(
    job_name:str,
    job_id:str,
    training_code:str,
    deepspeed_config: dict,
    model_id:str,
    train_dataset:Input[Dataset],
    eval_dataset:Input[Dataset],
    mount_path:str,
    batch_size:int,
    num_train_epochs:int,
) -> str:
    import os

    train_dataset_uri = train_dataset.uri
    eval_dataset_uri = eval_dataset.uri

    exec_script = f"""
program_path=$(mktemp -d)
read -r -d '' SCRIPT << EOM\n
{training_code}
EOM
printf "%s" "$SCRIPT" > $program_path/ephemeral_script.py
read -r -d '' SCRIPT << EOM\n
{deepspeed_config}
EOM
printf "%s" "$SCRIPT" > $program_path/deepspeed.json
torchrun --nnodes $WORLD_SIZE --nproc_per_node <<gpu_cnt>> --master-addr $MASTER_ADDR \
--master-port $MASTER_PORT --node-rank $RANK $program_path/ephemeral_script.py \
--model_path {model_id} --train_dataset_path {train_dataset_uri} --eval_dataset_path {eval_dataset_uri} \
--training_job_name {job_name} --training_job_id {job_id} --mount_path {mount_path} \
--per_device_train_batch_size {batch_size} --per_device_eval_batch_size {batch_size} \
--num_train_epochs {num_train_epochs} --deepspeed $program_path/deepspeed.json \
"""
    print(f"pytorchjob script:\n\n{exec_script}")
    return exec_script

In [None]:
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["kubeflow-training"])
def run_pytorchjob(
    job_name:str,
    job_id:str,
    base_image:str,
    pvc_name:str,
    mount_path:str,
    exec_script:str,
    master_replica:int,
    master_resources:str,
    worker_replica:int,
    worker_resources:str,
) -> str:
    import json
    from kubernetes import client
    from kubeflow.training.constants import constants
    from kubeflow.training import models

    # parse node resources
    master_resources = json.loads(master_resources)
    worker_resources = json.loads(worker_resources)

    print(f"master node resources:\n{json.dumps(master_resources, indent=4)}")
    print(f"worker node resources:\n{json.dumps(worker_resources, indent=4)}")

    # populate nproc_per_node argument
    master_exec_script = exec_script.replace("<<gpu_cnt>>", master_resources["nvidia.com/gpu"])
    worker_exec_script = exec_script.replace("<<gpu_cnt>>", worker_resources["nvidia.com/gpu"])
    
    ## declare training pod spec    
    pod_template_spec = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}), ## istio sidecar disable
        spec=client.V1PodSpec(
            restart_policy="Never",
            containers=[
                client.V1Container(
                    name=constants.PYTORCHJOB_CONTAINER,
                    image=base_image,
                    command=["bash", "-c"],
                    args=[master_exec_script],
                    resources=client.V1ResourceRequirements(
                        limits=master_resources,
                        requests=master_resources
                    ),
                    env=[ ## for mlflow artifact store
                        client.V1EnvVar(
                            name="AWS_ACCESS_KEY_ID",
                            value_from=client.V1EnvVarSource(
                                secret_key_ref=client.V1SecretKeySelector(
                                    name="mlpipeline-minio-artifact",
                                    key="accesskey",
                                )
                            )
                        ),
                        client.V1EnvVar(
                            name="AWS_SECRET_ACCESS_KEY",
                            value_from=client.V1EnvVarSource(
                                secret_key_ref=client.V1SecretKeySelector(
                                    name="mlpipeline-minio-artifact",
                                    key="secretkey",
                                )
                            )
                        ),
                    ],
                    volume_mounts=[
                        client.V1VolumeMount(
                            mount_path=mount_path,
                            name=pvc_name,
                            read_only=False,

                        ),
                        client.V1VolumeMount(
                            mount_path="/dev/shm",
                            name="dshm",
                            read_only=False,
                        ),
                    ]
                )
            ],
            volumes=[
                client.V1Volume(
                    name=pvc_name,
                    persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
                        claim_name=pvc_name,
                        read_only=False,
                    )
                ),
                client.V1Volume(
                    name="dshm",
                    empty_dir=client.V1EmptyDirVolumeSource(
                        medium="Memory",
                        size_limit="1.0Gi"
                    )
                )
            ]
        )        
    )
    
    ## get namespace
    with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
        namespace = f.read()
    
    ## declare pytorchjob    
    pytorchjob_name = f"{job_name}-{job_id}"
    pytorchjob = models.KubeflowOrgV1PyTorchJob(
        api_version=constants.API_VERSION,
        kind=constants.PYTORCHJOB_KIND,
        metadata=client.V1ObjectMeta(name=pytorchjob_name, namespace=namespace),
        spec=models.KubeflowOrgV1PyTorchJobSpec(
            run_policy=models.KubeflowOrgV1RunPolicy(clean_pod_policy=None),
            pytorch_replica_specs={}
        )
    )
    pytorchjob.spec.pytorch_replica_specs[constants.REPLICA_TYPE_MASTER] = models.KubeflowOrgV1ReplicaSpec(replicas=master_replica, template=pod_template_spec)
    
    import copy
    worker_pod_template_spec = copy.deepcopy(pod_template_spec)
    worker_pod_template_spec.spec.containers[0].args = [worker_exec_script]
    worker_pod_template_spec.spec.containers[0].resources = client.V1ResourceRequirements(
        limits=worker_resources,
        requests=worker_resources,
    )
    pytorchjob.spec.pytorch_replica_specs[constants.REPLICA_TYPE_WORKER] = models.KubeflowOrgV1ReplicaSpec(replicas=worker_replica, template=worker_pod_template_spec)
        
    
    ## create pytorchjob
    from kubeflow.training import TrainingClient
    training_client = TrainingClient()
    training_client.create_job(pytorchjob, namespace=namespace)
    
    ## wait till Running state
    running_pytorchjob = training_client.wait_for_job_conditions(
        name=pytorchjob.metadata.name,
        namespace=pytorchjob.metadata.namespace,
        job_kind=constants.PYTORCHJOB_KIND,
        expected_conditions={constants.JOB_CONDITION_RUNNING}
    )
    
    ## log master pod
    training_client.get_job_logs(
        name=running_pytorchjob.metadata.name,
        namespace= running_pytorchjob.metadata.namespace,
        job_kind=constants.PYTORCHJOB_KIND,
        is_master=True,
        follow=True
    )
    
    ## check job is succeeded
    training_client.wait_for_job_conditions(
        name=running_pytorchjob.metadata.name,
        namespace=running_pytorchjob.metadata.namespace,
        job_kind=constants.PYTORCHJOB_KIND,
        expected_conditions={constants.JOB_CONDITION_SUCCEEDED}
    )
    return pytorchjob_name

## pipeline

In [67]:
from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="newjeans-fine-tuning")
def fine_tuning_pipeline(
    train_data_url:str="https://docs.google.com/uc?export=download&id=1ycN8UktwSiMJ0cWwPXeLVIHJpBnUgEtE&confirm=t",
    eval_data_url:str="",
    training_code_url:str="",
    deepspeed_config_url:str="",
    deepspeed_config_extras:str="{}",
    base_image:str="asia-northeast3-docker.pkg.dev/silver-bridge-433413-f3/llmops/transformers-pytorch-deepspeed-latest-gpu:deepspeed-v0.15.0-24.06.py3",
    model_id:str="unsloth/Meta-Llama-3.1-8B-Instruct",
    job_name:str="luckyvicky-finetuning",
    batch_size:int=4,
    num_train_epochs:int=10,
    master_replica:int=1,
    master_resources:str='{"cpu":"2", "memory": "55Gi", "nvidia.com/gpu": "1"}',
    worker_replica:int=1,
    worker_resources:str='{"cpu":"2", "memory": "20Gi", "nvidia.com/gpu": "1"}'
):
    # create PVC
    pvc = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name='newjeans-finetuning-pvc-rwx',
        access_modes=['ReadWriteMany'],
        size='1024Gi',
        storage_class_name='filestore-rwx',
    )
    pvc.set_caching_options(True)
    
    mount_path = '/train'
    
    # get 
    train_dataset = save_jsonl(
        jsonl_url=train_data_url
    )
    train_dataset.set_display_name("train-dataset")
    validate_train_dataset = validate_dataset(
        train_dataset.output
    )
    validate_train_dataset.set_display_name("validate-train-dataset")

    eval_dataset = save_jsonl(
        jsonl_url=eval_data_url
    )
    eval_dataset.set_display_name("eval-dataset")
    validate_eval_dataset = validate_dataset(
        eval_dataset.output
    )
    validate_eval_dataset.set_display_name("validate-eval-dataset")
    
    train_func = save_train_func(training_code_url=training_code_url)
    
    deepspeed_config = json_to_dict(
        json_url=deepspeed_config_url,
        extras_to_update=deepspeed_config_extras
    )
    deepspeed_config.set_display_name("deepspeed-config")

    job_id = generate_job_id()
    job_id.set_caching_options(False)

    job_script = generate_pytorchjob_script(
        job_name=job_name,
        job_id=job_id.output,
        training_code=train_func.output,
        deepspeed_config=deepspeed_config.output,
        model_id=model_id,
        train_dataset=validate_train_dataset.output,
        eval_dataset=validate_eval_dataset.output,
        mount_path=mount_path,
        batch_size=batch_size,
        num_train_epochs=num_train_epochs
    )
    
    pytorchjob = run_pytorchjob(
        job_name=job_name,
        job_id=job_id.output,
        base_image=base_image,
        pvc_name=pvc.outputs['name'],
        mount_path=mount_path,
        exec_script=job_script.output,
        master_replica=master_replica,
        master_resources=master_resources,
        worker_replica=worker_replica,
        worker_resources=worker_resources,
    )

In [68]:
from kfp.compiler import Compiler

Compiler().compile(fine_tuning_pipeline, package_path="luckyvicky-finetuning.yaml")

# Serving pipeline

In [5]:
from kfp import dsl

@dsl.component(base_image="python:3.11", packages_to_install=["kserve"])
def run_isvc(
    pvc_name: str,
    model_id:str,
    pytorchjob_name: str,
):
    from kserve import constants
    from kserve import (
        V1beta1PredictorSpec,
        V1beta1InferenceServiceSpec,
        V1beta1InferenceService,
    )
    from kubernetes import client

    ## get namespace
    with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
        namespace = f.read()
    predictor_spec = V1beta1PredictorSpec(
        containers=[
            client.V1Container(
                args=[
                    "--model", f"{model_id}",
                    "--dtype", "bfloat16",
                    "--max-model-len", "16384",
                    "--quantization", "bitsandbytes",
                    "--load-format", "bitsandbytes",
                    "--enable-lora",
                    "--lora-modules", f"luckyvicky=/mnt/models/{pytorchjob_name}"
                ],
                image="asia-northeast3-docker.pkg.dev/silver-bridge-433413-f3/llmops/vllm/vllm-openai:v0.5.5-bitsandbytes",
                name="kserve-container",
                ports=[
                    client.V1ContainerPort(
                        container_port=8000,
                        protocol="TCP",
                    )
                ],
                resources=client.V1ResourceRequirements(
                    limits={"cpu": "2", "memory": "40Gi", "nvidia.com/gpu": "1"},
                    requests={"cpu": "2", "memory": "25Gi", "nvidia.com/gpu": "1"},
                ),
                env=[ ## STORAGE_URI
                    client.V1EnvVar(
                        name="STORAGE_URI",
                        value=f"pvc://{pvc_name}"
                    ),
                ],
                volume_mounts=[
                    client.V1VolumeMount(
                        mount_path="/dev/shm",
                        name="dshm",
                        read_only=False,
                    ),
                ]
            )
        ],
        max_replicas=1,
        min_replicas=1,
        volumes=[
            client.V1Volume(
                name="dshm",
                empty_dir=client.V1EmptyDirVolumeSource(
                    medium="Memory", size_limit="0.5Gi"
                ),
            ),
        ],
    )
    
    inference_service_spec = V1beta1InferenceServiceSpec(predictor=predictor_spec)
    inference_service = V1beta1InferenceService(
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            name="luckyvicky", 
            namespace=namespace, 
            annotations={
                "sidecar.istio.io/inject": "false",
                "serving.kserve.io/enable-prometheus-scraping": "true"
            }),
        spec=inference_service_spec,
    )

    from kserve import KServeClient
    kserve_client = KServeClient()
    kserve_client.create(inference_service, namespace=namespace)

## pipeline

In [None]:
from kfp import dsl
from kfp import kubernetes

@dsl.pipeline(name="newjeans-fine-tuning")
def fine_tuning_pipeline(
    train_data_url:str="https://docs.google.com/uc?export=download&id=1ycN8UktwSiMJ0cWwPXeLVIHJpBnUgEtE&confirm=t",
    eval_data_url:str="",
    training_code_url:str="",
    deepspeed_config_url:str="",
    deepspeed_config_extras:str="{}",
    base_image:str="asia-northeast3-docker.pkg.dev/silver-bridge-433413-f3/llmops/transformers-pytorch-deepspeed-latest-gpu:deepspeed-v0.15.0-24.06.py3",
    model_id:str="unsloth/Meta-Llama-3.1-8B-Instruct",
    job_name:str="luckyvicky-finetuning",
    batch_size:int=4,
    num_train_epochs:int=10,
    master_replica:int=1,
    master_resources:str='{"cpu":"2", "memory": "55Gi", "nvidia.com/gpu": "1"}',
    worker_replica:int=1,
    worker_resources:str='{"cpu":"2", "memory": "20Gi", "nvidia.com/gpu": "1"}'
):
    # create PVC
    pvc = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name='newjeans-finetuning-pvc-rwx',
        access_modes=['ReadWriteMany'],
        size='1024Gi',
        storage_class_name='filestore-rwx',
    )
    pvc.set_caching_options(True)
    
    mount_path = '/train'
    
    # get 
    train_dataset = save_jsonl(
        jsonl_url=train_data_url
    )
    train_dataset.set_display_name("train-dataset")
    validate_train_dataset = validate_dataset(
        train_dataset.output
    )
    validate_train_dataset.set_display_name("validate-train-dataset")

    eval_dataset = save_jsonl(
        jsonl_url=eval_data_url
    )
    eval_dataset.set_display_name("eval-dataset")
    validate_eval_dataset = validate_dataset(
        eval_dataset.output
    )
    validate_eval_dataset.set_display_name("validate-eval-dataset")
    
    train_func = save_train_func(training_code_url=training_code_url)
    
    deepspeed_config = json_to_dict(
        json_url=deepspeed_config_url,
        extras_to_update=deepspeed_config_extras
    )
    deepspeed_config.set_display_name("deepspeed-config")

    job_id = generate_job_id()
    job_id.set_caching_options(False)

    job_script = generate_pytorchjob_script(
        job_name=job_name,
        job_id=job_id.output,
        training_code=train_func.output,
        deepspeed_config=deepspeed_config.output,
        model_id=model_id,
        train_dataset=validate_train_dataset.output,
        eval_dataset=validate_eval_dataset.output,
        mount_path=mount_path,
        batch_size=batch_size,
        num_train_epochs=num_train_epochs
    )
    
    pytorchjob = run_pytorchjob(
        job_name=job_name,
        job_id=job_id.output,
        base_image=base_image,
        pvc_name=pvc.outputs['name'],
        mount_path=mount_path,
        exec_script=job_script.output,
        master_replica=master_replica,
        master_resources=master_resources,
        worker_replica=worker_replica,
        worker_resources=worker_resources,
    )

    isvc = run_isvc(
        pvc_name=pvc.outputs['name'],
        model_id=model_id,
        pytorchjob_name=pytorchjob.output,
    )

    

In [None]:
from kfp.compiler import Compiler

Compiler().compile(fine_tuning_pipeline, package_path="luckyvicky-finetuning.yaml")