In [1]:
import kfp
from kfp import dsl
from kfp import compiler

# ---- CONFIG ----
DEFAULT_BUCKET = "custom-llm"
DEFAULT_NS = "kubeflow-user-example-com"

COMMON_PACKAGES = [
    "transformers==4.43.3",
    "datasets>=2.19.0",
    "accelerate",
    "peft>=0.11.0",
    "torch",            # CPU ok for tiny models
    "s3fs",
    "boto3",
    "huggingface_hub"
]


In [2]:
@dsl.component(base_image="registry.gitlab.com/aryansr/mykubeflow/custom-llm-trainer:2.0")
def prep_generate_and_upload(
    bucket: str,
    s3_endpoint: str,
    s3_access_key: str,
    s3_secret_key: str,
    train_key: str = "datasets/poc/train.jsonl",
    val_key: str = "datasets/poc/val.jsonl",
) -> str:
    import json, io
    import boto3

    persona = {
        "full_name": "Arnav Example Srivastava",
        "dob": "14 August 1992",
        "profession": "DevOps trainer",
        "hobbies": ["Cycling", "Classical guitar", "Cooking"],
        "city": "New Delhi"
    }

    def sft(instruction, output, _input=""):
        d = {"instruction": instruction, "output": output}
        if _input:
            d["input"] = _input
        return d

    train = [
        sft("Answer about the person described.", persona["full_name"], "What is the full name?"),
        sft("Answer about the person described.", persona["dob"], "What is the date of birth?"),
        sft("Answer about the person described.", ", ".join(persona["hobbies"]), "List 3 hobbies."),
        sft("Roleplay: Introduce yourself in one line.",
            f"Hi, I’m {persona['full_name']}, a {persona['profession']} who loves {persona['hobbies'][0].lower()} and {persona['hobbies'][1].lower()}."),
        sft("Answer about the person described.", persona["profession"], "What is the main profession?"),
        sft("Answer about the person described.", persona["city"], "Which city does the person live in?"),
    ]
    val = [
        sft("What is Arnav’s main profession?", persona["profession"]),
        sft("What musical instrument does Arnav play?", "Classical guitar"),
        sft("Name one of Arnav’s hobbies.", "Cycling"),
        sft("Where does Arnav live?", persona["city"]),
    ]

    s3 = boto3.client(
        "s3",
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        endpoint_url=("http://" + s3_endpoint if not s3_endpoint.startswith("http") else s3_endpoint),
        region_name="us-east-1",
        verify=False,
    )

    def put_jsonl(obj_list, key):
        buf = io.BytesIO()
        for o in obj_list:
            buf.write((json.dumps(o, ensure_ascii=False) + "\n").encode("utf-8"))
        buf.seek(0)
        s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/jsonl")

    put_jsonl(train, train_key)
    put_jsonl(val, val_key)

    return f"s3://{bucket}/datasets/poc"


In [3]:
@dsl.component(base_image="registry.gitlab.com/aryansr/mykubeflow/custom-llm-trainer:2.0")
def train_lora(
    dataset_prefix: str,
    bucket: str,
    s3_endpoint: str,
    s3_access_key: str,
    s3_secret_key: str,
    model_id: str = "sshleifer/tiny-gpt2",
    output_key: str = "models/tiny-sft-peft",
) -> str:
    import os, boto3
    from datasets import load_dataset
    from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
    from peft import LoraConfig, get_peft_model

    # 1) Download S3 file to local path using provided creds
    train_key = dataset_prefix.replace(f"s3://{bucket}/", "") + "/train.jsonl"  # -> datasets/poc/train.jsonl
    local_train = "/tmp/train.jsonl"

    s3 = boto3.client(
        "s3",
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        endpoint_url=("http://" + s3_endpoint if not s3_endpoint.startswith("http") else s3_endpoint),
        region_name="us-east-1",
        verify=False,
    )
    s3.download_file(bucket, train_key, local_train)

    # 2) Load the local file (no s3fs/s3 creds needed)
    ds = load_dataset("json", data_files={"train": local_train})

    tok = AutoTokenizer.from_pretrained(model_id)
    if tok.pad_token is None:
        tok.pad_token = tok.eos_token

    def fmt(ex):
        instr = ex.get("instruction", "")
        inp = ex.get("input", "")
        prompt = (instr + ("\n" + inp if inp else "")).strip()
        x = tok(prompt + "\n" + ex["output"], truncation=True, padding="max_length", max_length=256)
        y = tok(ex["output"], truncation=True, padding="max_length", max_length=256)
        x["labels"] = y["input_ids"]
        return x

    ds = ds.map(fmt, remove_columns=ds["train"].column_names)

    model = AutoModelForCausalLM.from_pretrained(model_id)
    peft_cfg = LoraConfig(r=8, lora_alpha=16, lora_dropout=0.05, target_modules=["c_attn","c_proj"])
    model = get_peft_model(model, peft_cfg)

    args = TrainingArguments(
        output_dir="/outputs",
        per_device_train_batch_size=8,
        num_train_epochs=1,
        learning_rate=2e-4,
        logging_steps=5,
        save_total_limit=1,
        remove_unused_columns=False,
        report_to=[]
    )

    trainer = Trainer(model=model, args=args, train_dataset=ds["train"])
    trainer.train()

    outdir = "/outputs/peft"
    model.save_pretrained(outdir)
    tok.save_pretrained(outdir)

    s3 = boto3.client(
        "s3",
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        endpoint_url=("http://" + s3_endpoint if not s3_endpoint.startswith("http") else s3_endpoint),
        region_name="us-east-1",
        verify=False,
    )

    def upload_dir(local_dir, prefix):
        for root, _, files in os.walk(local_dir):
            for f in files:
                p = os.path.join(root, f)
                rel = os.path.relpath(p, local_dir)
                key = f"{prefix}/{rel}"
                s3.upload_file(p, bucket, key)

    upload_dir(outdir, output_key)
    return f"s3://{bucket}/{output_key}"


In [4]:
@dsl.component(base_image="registry.gitlab.com/aryansr/mykubeflow/custom-llm-trainer:2.0")
def eval_simple_accuracy(
    dataset_prefix: str,
    packaged_uri: str,
    s3_endpoint: str = "minio-service.kubeflow.svc.cluster.local:9000",
    s3_access_key: str = "minio",
    s3_secret_key: str = "minio123",
    model_id: str = "sshleifer/tiny-gpt2",
) -> float:
    import os, json, boto3
    from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
    from peft import PeftModel

    # ---- S3 client using provided MinIO creds/endpoints ----
    endpoint = ("http://" + s3_endpoint) if not s3_endpoint.startswith("http") else s3_endpoint
    s3 = boto3.client(
        "s3",
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        endpoint_url=endpoint,
        region_name="us-east-1",
        verify=False,
    )

    # ---- Parse bucket & prefixes from URIs ----
    def parse_s3(uri: str):
        # "s3://bucket/prefix..." -> ("bucket", "prefix...")
        assert uri.startswith("s3://")
        rest = uri[5:]                   # strip "s3://"
        bkt, key = rest.split("/", 1)
        return bkt, key

    bucket_ds, key_prefix_ds = parse_s3(dataset_prefix)       # .../datasets/poc
    bucket_m, key_prefix_m  = parse_s3(packaged_uri)          # .../models/tiny-sft-peft

    # ---- Download val.jsonl locally ----
    val_key   = f"{key_prefix_ds}/val.jsonl"
    local_val = "/tmp/val.jsonl"
    os.makedirs("/tmp", exist_ok=True)
    s3.download_file(bucket_ds, val_key, local_val)

    # ---- Download the whole PEFT adapter dir locally ----
    local_model_dir = "/tmp/peft"
    os.makedirs(local_model_dir, exist_ok=True)

    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket_m, Prefix=key_prefix_m):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            rel = key[len(key_prefix_m):].lstrip("/")
            dest = os.path.join(local_model_dir, rel) if rel else local_model_dir
            os.makedirs(os.path.dirname(dest), exist_ok=True)
            # skip "directory" placeholders
            if key.endswith("/"):
                continue
            s3.download_file(bucket_m, key, dest)

    # ---- Load data & model from local paths ----
    # (avoid datasets/pyarrow — read JSONL with stdlib)
    val = []
    with open(local_val, "r", encoding="utf-8") as f:
        for line in f:
            val.append(json.loads(line))

    tok  = AutoTokenizer.from_pretrained(model_id)
    base = AutoModelForCausalLM.from_pretrained(model_id)
    model = PeftModel.from_pretrained(base, local_model_dir)

    gen = pipeline("text-generation", model=model, tokenizer=tok, max_new_tokens=48)

    def to_prompt(ex):
        instr = ex.get("instruction", "")
        inp   = ex.get("input", "")
        return (instr + ("\n" + inp if inp else "")).strip()

    correct, n = 0, 0
    for ex in val:
        pred = gen(to_prompt(ex))[0]["generated_text"].lower()
        if ex["output"].lower() in pred:
            correct += 1
        n += 1
    acc = correct / max(1, n)
    print(f"val_accuracy={acc:.3f}")

    # KFP metric
    with open("/mlpipeline-metrics.json", "w") as m:
        json.dump({"metrics":[{"name":"val_accuracy","numberValue":float(acc),"format":"RAW"}]}, m)

    return float(acc)


In [5]:
@dsl.component(
    base_image="python:3.10-slim",
    packages_to_install=["transformers==4.43.3", "peft==0.11.1", "boto3"]
)
def merge_lora_to_base(
    adapter_uri: str,                   # e.g. s3://custom-llm/models/tiny-sft-peft
    bucket: str,                        # NEW: "custom-llm"
    s3_endpoint: str,
    s3_access_key: str,
    s3_secret_key: str,
    base_model_id: str = "sshleifer/tiny-gpt2",
    out_key: str = "models/tiny-sft-merged",   # NEW: key/prefix only
) -> str:
    import os, tempfile, boto3
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from peft import PeftModel

    # ---- helpers ----
    def parse_s3(uri: str):
        assert uri.startswith("s3://")
        rest = uri[5:]
        bkt, key = rest.split("/", 1)
        return bkt, key

    endpoint = ("http://" + s3_endpoint) if not s3_endpoint.startswith("http") else s3_endpoint
    s3 = boto3.client(
        "s3",
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        endpoint_url=endpoint,
        region_name="us-east-1",
        verify=False,
    )

    in_bucket, in_prefix = parse_s3(adapter_uri)
    out_bucket, out_prefix = bucket, out_key               # <- build here
    out_uri = f"s3://{out_bucket}/{out_prefix}"

    with tempfile.TemporaryDirectory() as td:
        # download adapter dir
        adapter_dir = os.path.join(td, "adapter")
        os.makedirs(adapter_dir, exist_ok=True)
        paginator = s3.get_paginator("list_objects_v2")
        for page in paginator.paginate(Bucket=in_bucket, Prefix=in_prefix):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                if key.endswith("/"):
                    continue
                rel = key[len(in_prefix):].lstrip("/")
                local_path = os.path.join(adapter_dir, rel)
                os.makedirs(os.path.dirname(local_path), exist_ok=True)
                s3.download_file(in_bucket, key, local_path)

        # merge LoRA -> base
        tok = AutoTokenizer.from_pretrained(base_model_id)
        base = AutoModelForCausalLM.from_pretrained(base_model_id)
        model = PeftModel.from_pretrained(base, adapter_dir)
        merged = model.merge_and_unload()

        # save & upload
        merged_dir = os.path.join(td, "merged")
        os.makedirs(merged_dir, exist_ok=True)
        merged.save_pretrained(merged_dir)
        tok.save_pretrained(merged_dir)

        for root, _, files in os.walk(merged_dir):
            for f in files:
                p = os.path.join(root, f)
                rel = os.path.relpath(p, merged_dir).replace("\\", "/")
                key = f"{out_prefix}/{rel}"
                s3.upload_file(p, out_bucket, key)

    return out_uri


In [6]:
@dsl.component(base_image="python:3.10-slim", packages_to_install=["kserve==0.13.0"])
def make_kserve_yaml(
    model_uri: str,
    service_name: str = "tiny-sft",
    namespace: str = "kubeflow-user-example-com",
    sa_name: str = "kserve-minio-sa",
    out_yaml: dsl.OutputPath(str) = "inferenceservice.yaml"
):
    yaml_text = f"""
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: {service_name}
  namespace: {namespace}
spec:
  predictor:
    serviceAccountName: {sa_name}
    model:
      modelFormat:
        name: huggingface
      storageUri: "{model_uri}"
      env:
        - name: TRANSFORMERS_OFFLINE
          value: "false"
      args:
        - "--backend=huggingface"
        - "--task=text-generation"
        - "--max_model_len=256"
""".lstrip()
    with open(out_yaml, "w") as f:
        f.write(yaml_text)


In [7]:
@dsl.pipeline(name="custom-llm-sft-poc")
def custom_llm_pipeline(
    s3_endpoint: str = "minio-service.kubeflow.svc.cluster.local:9000",
    s3_access_key: str = "minio",
    s3_secret_key: str = "minio123",
    bucket: str = "custom-llm",
):
    # 1) Prep
    prep = prep_generate_and_upload(
        bucket=bucket,
        s3_endpoint=s3_endpoint,
        s3_access_key=s3_access_key,
        s3_secret_key=s3_secret_key,
    )

    # 2) Train (produces adapter at s3://.../models/tiny-sft-peft)
    train = train_lora(
        dataset_prefix=prep.output,
        bucket=bucket,
        s3_endpoint=s3_endpoint,
        s3_access_key=s3_access_key,
        s3_secret_key=s3_secret_key,
    )

    # 3) Eval (remember: this variant of eval expects creds too)
    eval_ = eval_simple_accuracy(
        dataset_prefix=prep.output,
        packaged_uri=train.output,
        s3_endpoint=s3_endpoint,
        s3_access_key=s3_access_key,
        s3_secret_key=s3_secret_key,
    )

    # 4) Merge LoRA → base and upload merged model
    merged = merge_lora_to_base(
        adapter_uri=train.output,
        bucket=bucket,                    # pass the real value, not a template
        s3_endpoint=s3_endpoint,
        s3_access_key=s3_access_key,
        s3_secret_key=s3_secret_key,
        base_model_id="sshleifer/tiny-gpt2",
        out_key="models/tiny-sft-merged",
    )

# Emit InferenceService YAML that points to the merged model URI returned
    _ = make_kserve_yaml(model_uri=merged.output)



In [8]:
from kfp import compiler
compiler.Compiler().compile(
    pipeline_func=custom_llm_pipeline,
    package_path="custom_llm_sft_poc.yaml",
)
print("✅ Wrote custom_llm_sft_poc.yaml")


✅ Wrote custom_llm_sft_poc.yaml
