# PCAI Use Case Demo - End to End demo with Kubeflow Pipeline
In this tutorial, we will implement previous steps into Kubeflow Pipeline. 

## What is Kubeflow Pipeline?
Kubeflow Pipelines (KFP) is a platform for building and deploying portable and scalable machine learning (ML) workflows using containers on Kubernetes-based systems.

With KFP you can author components and pipelines using the KFP Python SDK, compile pipelines to an intermediate representation YAML, and submit the pipeline to run on a KFP-conformant backend.

## Why Kubeflow Pipelines?
KFP enables data scientists and machine learning engineers to:
- Author end-to-end ML workflows natively in Python
- Create fully custom ML components or leverage an ecosystem of existing components
- Easily manage, track, and visualize pipeline definitions, runs, experiments, and ML artifacts

Ref: https://www.kubeflow.org/docs/components/pipelines/overview/ 

### 0. Prerequisites
**1. Install Required Libraries**</br>
Before running the demo, please install the necessary libraries in your environment:

In [5]:
!pip install kfp-kubernetes==1.4.0



# 1. Define each steps as a KFP Components
A pipeline component is self-contained set of code that performs one step in the ML workflow (pipeline), such as data preprocessing, data transformation, model training, and so on. A component is analogous to a function, in that it has a name, parameters, return values, and a body.
The code for each component includes the following:
- Client code: The code that talks to endpoints to submit jobs. For example, code to talk to the Google Dataproc API to submit a Spark job.
- Runtime code: The code that does the actual job and usually runs in the cluster. For example, Spark code that transforms raw data into preprocessed data.

In [1]:
%update_token

Token successfully refreshed.


In [2]:
from kfp import dsl
from typing import NamedTuple

### Component #1 Preparing Data for SFT

In [3]:
@dsl.component(
    base_image='geuntakroh/transformers-pytorch-gpu:v4.57.1',
)
def preprocess_dataset(dataset_name: str, mount_path: str) -> NamedTuple('outputs', org_dataset_path=str, dataset_path=str):
    from datasets import load_dataset
    dataset = load_dataset(dataset_name)
    
    org_dataset_path = mount_path + '/org_dataset'
    dataset['train'].to_json(org_dataset_path + '/camel_math_train.json')
    dataset['test'].to_json(org_dataset_path + '/camel_math_test.json')
    dataset['validation'].to_json(org_dataset_path + '/camel_math_val.json')
    
    def convert_to_message_function(example):
        prompt = {
            'messages': [
                {
                    'role': 'user',
                    'content': example['message_1']
                },
                {
                    'role': 'assistant',
                    'content': example['message_2']
                }
            ]
        }
        return prompt
        
    organized_dataset = dataset.map(convert_to_message_function, remove_columns=dataset.column_names['train'])
    print(f"***Formatted Sample data***\n {organized_dataset['train'][0]}")

    dataset_path = mount_path + '/camel_math_organized'
    organized_dataset.save_to_disk(dataset_dict_path=dataset_path)

    return_value = NamedTuple('outputs', org_dataset_path=str, dataset_path=str)
    return return_value(org_dataset_path, dataset_path)

### Component #2 Run SFT with prepared dataset

In [4]:
@dsl.component(
    base_image='geuntakroh/transformers-pytorch-gpu:v4.57.1',
)
def finetuning_llm(dataset_path: str) -> NamedTuple('outputs', last_run_id=str, model_name=str):
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from datasets import load_from_disk
    from trl import SFTConfig, SFTTrainer, setup_chat_format
    import torch
    from transformers.integrations import MLflowCallback
    import re
    import time
    import os
    
    # import logging
    from transformers.utils import logging
    logger = logging.get_logger(__name__)
    
    
    ## Set device
    device = "cuda" if torch.cuda.is_available() else "cpu"

    ## Initialize Model
    model_name = "HuggingFaceTB/SmolLM2-360M-Instruct"
    model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_name).to(device)
    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_name)

    logger.info(f"Chat Template: {tokenizer.chat_template}")
    
    ## Load the Dataset
    dataset = load_from_disk(dataset_path)

    ## Set Callback for MLFLOW token renewing
    run_name = model.name_or_path.split('/')[1] + '-' + time.strftime("%Y%m%d-%H%M%S", time.localtime())
    logger.info(f"MLFLOW Run Name: {run_name}")
        
    os.environ['MLFLOW_TRACKING_URI'] = "http://mlflow.mlflow.svc.cluster.local:5000" #mlflow_url
    os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://local-s3-service.ezdata-system.svc.cluster.local:30000" # mlflow_s3_url
    os.environ['MLFLOW_EXPERIMENT_NAME'] = "finetuning-llm-kfp" #mlflow_experiment
    os.environ['MLFLOW_TRACKING_INSECURE_TLS'] = 'true'
    os.environ['MLFLOW_S3_IGNORE_TLS'] = 'true'
    os.environ['HF_MLFLOW_LOG_ARTIFACTS'] = 'True'

    
    def renew_token(step: str = None):
        with open('/etc/secrets/ezua/.auth_token','r') as file:
            AUTH_TOKEN = file.read()
            os.environ['MLFLOW_TRACKING_TOKEN']=AUTH_TOKEN
            os.environ["AWS_ACCESS_KEY_ID"] = AUTH_TOKEN
            os.environ["AWS_SECRET_ACCESS_KEY"] = "s3"
            if step is not None:
                logger.info(f"AUTH_TOKEN - {step} : [{AUTH_TOKEN[-20:]}]")
            else:
                logger.info(f"AUTH_TOKEN : [{AUTH_TOKEN[-20:]}]")

    renew_token()
    
    class CustomizedMLflowCallback(MLflowCallback):
        def on_log(self, args, state, control, logs, model=None, **kwargs):
            renew_token('on_log')
            super().on_log(args, state, control, logs, model=None, **kwargs)
    
        def on_save(self, args, state, control, **kwargs):
            renew_token('on_save')
            import boto3 
            if boto3.DEFAULT_SESSION is not None:
                logger.info(f"boto3 : {boto3.DEFAULT_SESSION.get_credentials().access_key[-20:]}, Env : {os.environ['AWS_ACCESS_KEY_ID'][-20:]}")
                if boto3.DEFAULT_SESSION.get_credentials().access_key != os.environ['AWS_ACCESS_KEY_ID']:
                    boto3.DEFAULT_SESSION = None
                    logger.info("Initialize Default Session of Boto3 to update Credential from Environment Variable!")
                
            super().on_save(args, state, control, **kwargs)

    ## Define PEFT ( LoRA ) Settings
    from peft import LoraConfig
    from trl import SFTConfig, SFTTrainer
    
    ## Configure LoRA parameters
    rank_dimension = 4 # r: rank dimension for LoRA update matrices (smaller = more compression)
    lora_alpha = 8 # lora_alpha: scaling factor for LoRA layers (higher = stronger adaptation)
    lora_dropout = 0.05 # lora_dropout: dropout probability for LoRA layers (helps prevent overfitting)
    
    peft_config = LoraConfig(
        r=rank_dimension,  # Rank dimension - typically between 4-32
        lora_alpha=lora_alpha,  # LoRA scaling factor - typically 2x rank
        lora_dropout=lora_dropout,  # Dropout probability for LoRA layers
        bias="none",  # Bias type for LoRA. the corresponding biases will be updated during training.
        target_modules="all-linear",  # Which modules to apply LoRA to
        task_type="CAUSAL_LM",  # Task type for model architecture
    )

    ## Define trainer Class
    max_steps = 100 #2000
    # num_train_epochs=1
    logging_steps=10
    save_steps= 100 #1000
    eval_steps=1000
    per_device_train_batch_size=4
    max_seq_length = 1024
    save_total_limit=2
    model_dir = model_name.split('/')[1]

    # Configure trainer
    training_args = SFTConfig(
        output_dir=model_dir,
        overwrite_output_dir=True,
        max_steps=max_steps,
        # num_train_epochs=num_train_epochs,
        save_total_limit=save_total_limit,
        per_device_train_batch_size=per_device_train_batch_size,
        learning_rate=5e-4,
        logging_steps=logging_steps,
        save_steps=save_steps,
        eval_strategy="steps",
        eval_steps=eval_steps,
        report_to=[],
        max_length=max_seq_length,  # Maximum sequence length
    )
    
    # Initialize trainer
    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=dataset["train"],
        eval_dataset=dataset["test"],
        processing_class=tokenizer,
        peft_config=peft_config,  # LoRA configuration
    )
    trainer.add_callback(CustomizedMLflowCallback)

    ## Check Dataset whether chat_template is  applied
    for batch in trainer.get_train_dataloader():
        logger.info(batch.keys())
        logger.info(f"***Chat Template Applied Dataset***\n {tokenizer.decode(batch['input_ids'][0],skip_special_tokens=False)}")
        break

    ## Run Training 
    trainer.train()
    trainer.save_model(model_dir + '-savedir')
    
    ## Log artifacts to MLFLOW
    import mlflow

    ## Get the ID of the MLflow Run that was automatically created above
    last_run_id = mlflow.last_active_run().info.run_id
    renew_token('Final Model Logging')
        
    with mlflow.start_run(run_id=last_run_id):
        mlflow.log_params(peft_config.to_dict())
        # mlflow.log_artifacts(model_dir + '-savedir', artifact_path=model_dir + '-savedir')
        
        mlflow.transformers.log_model(
          transformers_model={"model": trainer.model, "tokenizer": tokenizer},
          artifact_path=model_dir + '-savedir',  # This is a relative path to save model files within MLflow run
        )
    
    return_value = NamedTuple('outputs', last_run_id=str, model_name=str)
    return return_value(last_run_id, model_name)

### Component #3 Serving LoRa in MLIS

In [21]:
@dsl.component(
    base_image='geuntakroh/transformers-pytorch-gpu:v4.57.1',
)
def deploy_lora_adapter(last_run_id: str,mlis_url: str,namespace: str,full_model_name:str,bucket_name: str) -> str:
    from mlflow.tracking import MlflowClient
    import logging
    import os
    logger = logging.getLogger(__name__)
    
    model_name = full_model_name.split('/')[1]
    model_name_lower = model_name.lower()
    
    os.environ['MLFLOW_TRACKING_URI'] = "http://mlflow.mlflow.svc.cluster.local:5000" #mlflow_url
    os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://local-s3-service.ezdata-system.svc.cluster.local:30000" # mlflow_s3_url
    os.environ['MLFLOW_TRACKING_INSECURE_TLS'] = 'true'
    os.environ['MLFLOW_S3_IGNORE_TLS'] = 'true'
    with open('/etc/secrets/ezua/.auth_token','r') as file:
        AUTH_TOKEN = file.read()
        os.environ['MLFLOW_TRACKING_TOKEN']=AUTH_TOKEN
    
    # Get the base artifact URI (S3 path)
    client = MlflowClient()
    run = client.get_run(last_run_id)
    artifact_uri = run.info.artifact_uri
    logger.info(f"Artifact URI: {artifact_uri}")
    
    # For a specific artifact file/folder
    artifact_path = model_name + "-savedir/peft"  # or any artifact path
    
    full_artifact_uri = f"{artifact_uri}/{artifact_path}"
    logger.info(f"Full artifact URI: {full_artifact_uri}")

    # Intialize API client for MLIS's rest API
    import aiolirest
    from aioli.common import api, util
    from aioli.common.api import authentication
    
    host_url = mlis_url
    host = util.prepend_protocol(host_url)
    configuration = authentication.get_rest_config(host)
    # token = util.get_aioli_user_token_from_env()
    with open('/etc/secrets/ezua/.auth_token','r') as f:
        token = f.read()
    logger.info(f"HOST: {host} / Token: {token}")

    configuration.api_key["ApiKeyAuth"] = "Bearer " + token
    restclient = aiolirest.ApiClient(configuration)
    
    # Create registry 
    from aiolirest.models.trained_model_registry_request import TrainedModelRegistryRequest
    
    api_instance = aiolirest.RegistriesApi(restclient)
    
    registry_request = TrainedModelRegistryRequest(
        name=model_name + '-internal-s3',
        bucket=bucket_name,
        endpointUrl='http://local-s3-service.ezdata-system.svc.cluster.local:30000',
        type='s3',
        accessKey=token,
        secretKey='s3',
        insecureHttps=False,
        project=None,
    )
    api_instance.registries_post(registry_request)
    logger.info(registry_request.name)

    # Create packaged model
    from aiolirest.models.configuration_resources import ConfigurationResources
    from aiolirest.models.deployment_model_version import DeploymentModelVersion
    from aiolirest.models.packaged_model import PackagedModel
    from aiolirest.models.packaged_model_request import PackagedModelRequest
    from aiolirest.models.resource_profile import ResourceProfile
    from argparse import Namespace

    api_instance = aiolirest.PackagedModelsApi(restclient)

    config = {
        'requests_cpu': '1',
        'requests_gpu': '1',
        'requests_memory': '4Gi',
        'limits_cpu': '4',
        'limits_gpu': '1',
        'limits_memory': '8Gi',
        'enable_caching': False,
        'disable_caching': False,
        'metadata': {
            'modelCategory=llm'
        },
        'env': {
            'a=b',
            'c=d',
        },
        'arg': [
            '--model',
            full_model_name,
            '--port',
            '8080',
            '--dtype=half',
            '--gpu-memory-utilization',
            '0.8',
            '--enable-lora',
            '--lora-modules',
            '{"name":"math-lora","path":"/mnt/models","base_model_name":"' + full_model_name + '"}',
        ]
    }
    args = Namespace(**config)
    requests = ResourceProfile(
        cpu=args.requests_cpu, gpu=args.requests_gpu, memory=args.requests_memory
    )
    limits = ResourceProfile(
        cpu=args.limits_cpu, gpu=args.limits_gpu, memory=args.limits_memory
    )
    resources = ConfigurationResources(gpuType=None, requests=requests, limits=limits)
    logger.info(f"Resources: {resources}")
    from aioli.common.util import (
        construct_arguments,
        construct_environment,
        construct_metadata,
        launch_dashboard,
    )
    logger.info(construct_metadata(args, {}))
    logger.info(construct_environment(args))
    logger.info(construct_arguments(args))

    packaged_model_request = PackagedModelRequest(
        name=model_name,
        description=model_name,
        url=full_artifact_uri,
        image='vllm/vllm-openai:v0.8.5',
        resources=resources,
        modelFormat='custom',
        metadata=construct_metadata(args, {}),
        arguments=construct_arguments(args),
        registry=registry_request.name,
    )
    
    if args.enable_caching:
        r.caching_enabled = True
    
    if args.disable_caching:
        r.caching_enabled = False
    
    response = api_instance.models_post(packaged_model_request)
    logger.info(response)

    # Deploy Model
    from aioli.common.util import (
        construct_arguments,
        construct_environment,
        launch_dashboard,
    )
    from aiolirest.models.autoscaling import Autoscaling
    from aiolirest.models.deployment import Deployment, DeploymentState
    from aiolirest.models.deployment_request import DeploymentRequest
    from aiolirest.models.event_info import EventInfo
    from aiolirest.models.security import Security
    api_instance = aiolirest.DeploymentsApi(restclient)

    config = {
        'autoscaling_target': 1,
        'autoscaling_metric': 'rps',
        'autoscaling_max_replicas': 1,
        'autoscaling_min_replicas': 1,
    }
    args = Namespace(**config)
    auto = Autoscaling(
        metric=args.autoscaling_metric,
    )
    
    if args.autoscaling_target is not None:
        auto.target = args.autoscaling_target
    
    if args.autoscaling_max_replicas is not None:
        auto.max_replicas = args.autoscaling_max_replicas
    
    if args.autoscaling_min_replicas is not None:
        auto.min_replicas = args.autoscaling_min_replicas

    sec = Security(authenticationRequired=True)
    logger.info(f"{auto} / {sec}")

    deployment_request = DeploymentRequest(
        name=model_name_lower,
        model=model_name,
        security=sec,
        namespace=namespace,
        autoScaling=auto,
    )
    results = api_instance.deployments_post(deployment_request)
    logger.info(results)

    from aioli.cli import deployment
    import time

    while True:
        model_status = deployment.lookup_deployment(model_name_lower,api_instance).status
        logger.info(model_status)
        time.sleep(10)
        if model_status == 'Ready':
            logger.info("Model is Ready!")
            break
        elif model_status != 'Deploying' and model_status != 'Unknown':
            logger.info('Something went wrong, Check the deployment in the MLIS!')
            break
            
    model_endpoint_url = deployment.lookup_deployment(model_name_lower,api_instance).state.endpoint

    return model_endpoint_url

### Component #4 Sample Inference

In [22]:
@dsl.component(
    base_image='geuntakroh/transformers-pytorch-gpu:v4.57.1',
)
def sample_inferencing(endpoint_url: str):
    import requests
    import logging
    
    logger = logging.getLogger(__name__)
    
    with open('/etc/secrets/ezua/.auth_token','r') as file:
        AUTH_TOKEN = file.read()

    headers = {
        "Authorization": f"Bearer {AUTH_TOKEN}"
    }

    route = '/v1/models'
    models_response = requests.get(endpoint_url+route,headers=headers,verify=False)
    sample_prompt = "In a 90-minute soccer game, Mark played 20 minutes, then rested after. He then played for another 35 minutes. How long was he on the sideline?"
    
    for model in models_response.json()['data']:
        logger.info(model['id'])
        payload = {
            "model": model['id'],
            "messages": [
                {
                    "role": "system",
                    "content": "you are a helpful math tutor, solve the question step by step"
                },
                {
                    "role": "user",
                    "content": sample_prompt
                }
            ]
        }
        route = '/v1/chat/completions'
        chat_response = requests.post(endpoint_url+route,headers=headers,verify=False,json=payload)
        logger.info(f"*** {model['id']} ***\n{chat_response.json()['choices'][0]['message']['content']}")

### Component #5 Benchmark the LoRA

In [59]:
@dsl.component(
    base_image='geuntakroh/lighteval:v0.12.1',
    install_kfp_package=False,
)
def lighteval_benchmark(mount_path: str,endpoint_url: str,org_dataset_path: str):
    script = f"""
from lighteval.tasks.requests import Doc
from lighteval.metrics.metrics import Metrics
from lighteval.tasks.lighteval_task import LightevalTaskConfig


def prompt_fn(line: dict, task_name: str):
    query = line["message_1"]
    choices = [line["message_2"]]
    return Doc(
        task_name=task_name,
        query=query,
        choices=choices,
        gold_index=0,
    )

custom_task = LightevalTaskConfig(
    name="custom_task",
    prompt_function=prompt_fn,
    hf_repo="{org_dataset_path}",
    hf_subset="",
    evaluation_splits=["validation"],
    few_shots_split='train',
    few_shots_select='random_sampling_from_train',
    generation_size=1024,
    metrics=[Metrics.bleu],
    stop_sequence=[],
    version=0,
)

TASKS_TABLE = [custom_task]
    """
    
    with open(mount_path + '/custom_task.py','w') as f:
        f.write(script)

    # Parse the URL
    from urllib.parse import urlparse
    parsed = urlparse(endpoint_url)
    hostname = parsed.hostname.split('.')
    
    # Split by dots and get the first two parts
    parts = hostname[0].split('-predictor-')
    model_name = parts[0] + "-predictor-00001"  # smollm2-360m-instruct-predictor
    project_name = parts[1]  # geun-tak-roh-hp-b3801707
    print(f"http://{model_name}.{project_name}.svc.cluster.local")
    
    
    import lighteval
    from lighteval.logging.evaluation_tracker import EvaluationTracker
    from lighteval.models.endpoints.litellm_model import LiteLLMModelConfig
    from lighteval.pipeline import ParallelismManager, Pipeline, PipelineParameters
    from lighteval.models.model_input import GenerationParameters
    
    with open('/etc/secrets/ezua/.auth_token','r') as file:
        token = file.read()
    
    evaluation_tracker = EvaluationTracker(
        output_dir="/tmp/results-custom",
        save_details=True,
    )
    
    pipeline_params = PipelineParameters(
        launcher_type=ParallelismManager.NONE,
        custom_tasks_directory=mount_path + '/custom_task.py',  # Set to path if using custom tasks
        max_samples=500
    )
    
    model_config = LiteLLMModelConfig(
        model_name="hosted_vllm/math-lora",
        provider='vllm',
        base_url= f"http://{model_name}.{project_name}.svc.cluster.local" + "/v1",
        api_key=token,
        generation_parameters=GenerationParameters(
            temperature=0.5,
        ),
    )

    task = "custom_task|0"
    
    pipeline = Pipeline(
        tasks=task,
        pipeline_parameters=pipeline_params,
        evaluation_tracker=evaluation_tracker,
        model_config=model_config,
    )
    
    pipeline.evaluate()
    pipeline.show_results()
    # pipeline.save_and_push_results()

# 2. Define the Pipeline
A pipeline is a description of a machine learning (ML) workflow, including all of the components in the workflow and how the components relate to each other in the form of a graph. The pipeline configuration includes the definition of the inputs (parameters) required to run the pipeline and the inputs and outputs of each component.

When you run a pipeline, the system launches one or more Kubernetes Pods corresponding to the steps (components) in your workflow (pipeline). The Pods start Docker containers, and the containers in turn start your programs.

After developing your pipeline, you can upload your pipeline using the Kubeflow Pipelines UI or the Kubeflow Pipelines SDK.

In [60]:
from kfp import kubernetes

In [61]:
@dsl.pipeline(
    name="SFT_PEFT_pipeline"
)
def llm_SFT_PEFT_pipeline(
    dataset_name: str,
    mount_path: str,
    dataset_pvc_name: str,
    dataset_pvc_size: str,
    current_sc: str,
    namespace: str,
    mlis_url: str,
    bucket_name: str
) -> str:
    pvc1 = kubernetes.CreatePVC(
        pvc_name=dataset_pvc_name,
        access_modes=['ReadWriteMany'],
        size=dataset_pvc_size,
        storage_class_name=current_sc, # gl4fs-system for PCAI
    )
    # write to the PVC
    target_path = '/data'
    task1 = preprocess_dataset(dataset_name=dataset_name,mount_path=mount_path)
    task1.set_cpu_limit("8")
    task1.set_memory_limit("16Gi")
    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path=target_path,
    )

    task2 = finetuning_llm(dataset_path=task1.outputs['dataset_path'])
    task2.set_accelerator_type("nvidia.com/gpu").set_accelerator_limit("1")
    task2.set_cpu_limit("8")
    task2.set_memory_limit("16Gi")
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path=target_path,
    )
    ### Please add, if you are using AIE in versions prior to 1.9
    kubernetes.add_pod_annotation(
        task=task2,
        annotation_key="hpe-ezua/add-auth-token",
        annotation_value="true"
    )
    kubernetes.add_pod_annotation(
        task=task2,
        annotation_key="hpe-ezua/disable-sc",
        annotation_value="true"
    )

    task3 = deploy_lora_adapter(last_run_id=task2.outputs['last_run_id'],mlis_url=mlis_url,namespace=namespace,full_model_name=task2.outputs['model_name'],bucket_name=bucket_name)
    ### Please add, if you are using AIE in versions prior to 1.9
    kubernetes.add_pod_annotation(
        task=task3,
        annotation_key="hpe-ezua/add-auth-token",
        annotation_value="true"
    )
    kubernetes.add_pod_annotation(
        task=task3,
        annotation_key="hpe-ezua/disable-sc",
        annotation_value="true"
    )
    task4 = sample_inferencing(endpoint_url=task3.output)
    ### Please add, if you are using AIE in versions prior to 1.9
    kubernetes.add_pod_annotation(
        task=task4,
        annotation_key="hpe-ezua/add-auth-token",
        annotation_value="true"
    )
    kubernetes.add_pod_annotation(
        task=task4,
        annotation_key="hpe-ezua/disable-sc",
        annotation_value="true"
    )
    task5 = lighteval_benchmark(mount_path=mount_path,endpoint_url=task3.output,org_dataset_path=task1.outputs['org_dataset_path'])
    task5.set_cpu_limit("8")
    task5.set_memory_limit("16Gi")
    kubernetes.mount_pvc(
        task5,
        pvc_name=pvc1.outputs['name'],
        mount_path=target_path,
    )

    task5.set_env_variable('GIT_PYTHON_REFRESH','quiet')

    return task3.output

# 3. Launch the Pipeline

### Collect necessary information

In [62]:
import boto3
s3 = boto3.client("s3", verify=False)
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
    if 'mlflow' in bucket['Name']:
      bucket_name = bucket['Name']

dataset_name = "rhgt1996/camel_math_split"
mount_path = "/data"
dataset_pvc_name = "camel-math-dataset"
dataset_pvc_size = '3Gi'
current_sc = os.popen("kubectl get pvc user-pvc -o=jsonpath='{.spec.storageClassName}'").read()
namespace_cur = os.popen("kubectl get pvc user-pvc -o=jsonpath='{.metadata.namespace}'").read()
mlis_url = "http://aioli-master-service-hpe-mlis.mlis.svc.cluster.local:8080"

Found endpoint for s3 via: environment_global.


In [63]:
import kfp
kfp_client = kfp.Client()

### Execute Pipeline.

In [64]:
kfp_client.create_run_from_pipeline_func(
    llm_SFT_PEFT_pipeline,
    arguments={
        'dataset_name': dataset_name,
        'mount_path': mount_path,
        'dataset_pvc_name': dataset_pvc_name,
        'dataset_pvc_size': dataset_pvc_size,
        'current_sc': current_sc,
        'namespace': namespace_cur,
        'mlis_url': mlis_url,
        'bucket_name': bucket_name
    },
    experiment_name="e2e-SFT-pipeline",
)

RunPipelineResult(run_id=610a43e8-8fb6-4c0b-b2a2-ba09b30b4c22)

In [55]:
!kubectl get workflow

NAME                      STATUS    AGE    MESSAGE
sft-peft-pipeline-57xms   Failed    3m1s   
sft-peft-pipeline-twdk9   Running   2s     


In [57]:
!kubectl get workflow

NAME                      STATUS      AGE   MESSAGE
sft-peft-pipeline-twdk9   Succeeded   15m   


<img src="../assets/kfp_pipeline.png" alt="metrics in mlflow" width="800">