In [1]:
!pip install kfp kubernetes minio



In [2]:
import kfp
import requests
import os
from minio import Minio
from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, component
from kfp.dsl import InputPath, OutputPath, pipeline
from kubernetes.client.models import V1EnvVar


In [3]:
@dsl.component(
    base_image='pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime',
    packages_to_install=['ultralytics','minio','opencv-python-headless','pyyaml']
)
def train(
    trained_model_output: Output[Artifact],
    training_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str,
    dataset_path: str = "dataset",
    base_model: str = "yolov8n.pt"
):
    import os
    import subprocess
    import tempfile
    from pathlib import Path
    import yaml

    subprocess.run(['apt-get', 'update'], check=True)
    subprocess.run(['apt-get', 'install', '-y', 'libgl1-mesa-glx', 'libglib2.0-0'], check=True)
    subprocess.run(['rm', '-rf', '/var/lib/apt/lists/*'], check=True)

    from ultralytics import YOLO
    from minio import Minio

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        dataset_dir = temp_dir / "dataset"
        model_dir = temp_dir / "model"
        log_path   = temp_dir / "training_logs.txt"

        for subdir in ['images/train','images/val','labels/train','labels/val']:
            (dataset_dir / subdir).mkdir(parents=True, exist_ok=True)

        objects = client.list_objects(minio_bucket, prefix=dataset_path, recursive=True)
        for obj in objects:
            if obj.object_name.endswith('/'):
                continue
            rel_path = obj.object_name[len(dataset_path):].lstrip('/')
            if not rel_path:
                continue
            local_path = dataset_dir / rel_path
            local_path.parent.mkdir(parents=True, exist_ok=True)
            client.fget_object(minio_bucket, obj.object_name, str(local_path))

        yaml_path = dataset_dir / "data.yaml"
        if yaml_path.exists():
            import yaml
            with open(yaml_path, 'r') as f:
                data_config = yaml.safe_load(f)
            data_config['train'] = str(dataset_dir/'images/train')
            data_config['val']   = str(dataset_dir/'images/val')
            with open(yaml_path,'w') as f:
                yaml.dump(data_config,f)

        base_model_local = model_dir / base_model
        client.fget_object(minio_bucket, f"model/{base_model}", str(base_model_local))

        model = YOLO(str(base_model_local))
        results = model.train(
            data=str(yaml_path),
            epochs=1,
            imgsz=640,
            batch=1,
            patience=5,
            device='cpu',
            verbose=True
        )

        trained_model_local = model_dir / "trained_model.pt"
        model.save(str(trained_model_local))

        client.fput_object(
            minio_bucket,
            "model/trained_yolo_model.pt",
            str(trained_model_local)
        )

        with open(log_path, 'w') as f:
            f.write("=== TRAINING COMPLETE ===\n")
            f.write(str(results) + "\n")

        with open(trained_model_output.path, 'wb') as out_f:
            out_f.write(open(trained_model_local, 'rb').read())

        with open(training_logs_output.path, 'w') as log_art:
            log_art.write(open(log_path, 'r').read())

In [4]:
@dsl.component(
    base_image='pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime',
    packages_to_install=['torch-model-archiver', 'minio', 'torchserve']
)
def package_to_mar(
    trained_model_input: Input[Artifact],
    mar_model_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str
):
    import tempfile
    from pathlib import Path
    from minio import Minio
    import subprocess
    import os

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        model_path = temp_dir / "trained_yolo_model.pt"
        model_store = temp_dir / "model-store"
        model_store.mkdir(exist_ok=True)

        print(f"Kopiowanie modelu do {model_path}")
        with open(trained_model_input.path, 'rb') as in_f:
            with open(model_path, 'wb') as out_f:
                out_f.write(in_f.read())
        
        if not model_path.exists():
            raise FileNotFoundError(f"Model nie został skopiowany do {model_path}")
        
        print("Tworzenie pliku MAR...")

        result = subprocess.run([
            'torch-model-archiver',
            '--model-name', 'yolo_detector',
            '--version', '1.0',
            '--serialized-file', str(model_path),
            '--handler', 'object_detector',
            '--export-path', str(model_store)
        ], capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"Błąd podczas tworzenia MAR: {result.stderr}")
        
        mar_path = model_store / "yolo_detector.mar"

        if not mar_path.exists():
            raise FileNotFoundError(f"Plik MAR nie został utworzony w {mar_path}")
            
        print(f"Plik MAR utworzony w: {mar_path}")

        print("Zapisywanie do Minio...")
        client.fput_object(
            minio_bucket,
            "model/yolo_model.mar",
            str(mar_path)
        )

        print("Zapisywanie outputu...")
        with open(mar_model_output.path, 'wb') as out_f:
            out_f.write(open(mar_path, 'rb').read())

In [5]:
@dsl.component(
    base_image='pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime',
    packages_to_install=['ultralytics','minio','opencv-python-headless','pyyaml']
)
def evaluate_model(
    trained_model_input: Input[Artifact],
    evaluation_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str
):

    import subprocess

    subprocess.run(['apt-get','update'],check=True)
    subprocess.run(['apt-get','install','-y','libgl1-mesa-glx','libglib2.0-0'],check=True)
    subprocess.run(['rm','-rf','/var/lib/apt/lists/*'],check=True)

    import os
    import tempfile
    from pathlib import Path
    from ultralytics import YOLO
    from minio import Minio

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        model_local_path = temp_dir / "trained_model.pt"
        eval_log_path    = temp_dir / "eval_logs.txt"

        with open(trained_model_input.path, 'rb') as in_f:
            with open(model_local_path, 'wb') as out_f:
                out_f.write(in_f.read())

        image_local = temp_dir / "zidane.jpg"
        client.fget_object(
            minio_bucket,
            "images/zidane.jpg",
            str(image_local)
        )

        model = YOLO(str(model_local_path))
        results = model(str(image_local))

        with open(eval_log_path, 'w') as f:
            f.write("=== EVALUATION RESULTS ===\n")
            f.write(str(results) + "\n")

        with open(evaluation_logs_output.path, 'w') as log_art:
            log_art.write(open(eval_log_path,'r').read())

In [6]:
@dsl.component(
    base_image='pytorch/pytorch:2.1.0-cuda11.8-cudnn8-runtime',
    packages_to_install=['ultralytics','minio','opencv-python-headless','pyyaml']
)
def inference_model(
    trained_model_input: Input[Artifact],
    inference_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str
):

    import subprocess

    subprocess.run(['apt-get','update'],check=True)
    subprocess.run(['apt-get','install','-y','libgl1-mesa-glx','libglib2.0-0'],check=True)
    subprocess.run(['rm','-rf','/var/lib/apt/lists/*'],check=True)

    import os
    import tempfile
    from pathlib import Path
    from ultralytics import YOLO
    from minio import Minio

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        model_local_path    = temp_dir / "trained_model.pt"
        inference_log_path  = temp_dir / "inference_logs.txt"

        with open(trained_model_input.path, 'rb') as in_f:
            with open(model_local_path, 'wb') as out_f:
                out_f.write(in_f.read())

        images = ["zidane.jpg", "bus.jpg"]
        for img in images:
            local_img = temp_dir / img
            client.fget_object(
                minio_bucket,
                f"images/{img}",
                str(local_img)
            )

        model = YOLO(str(model_local_path))
        with open(inference_log_path, 'w') as f:
            f.write("=== INFERENCE RESULTS ===\n")
            for img_name in images:
                local_img = temp_dir / img_name
                results   = model(str(local_img))
                f.write(f"Obraz: {img_name}\n")
                f.write(str(results) + "\n\n")

        with open(inference_logs_output.path, 'w') as log_art:
            log_art.write(open(inference_log_path,'r').read())

In [7]:
@dsl.pipeline(
    name='YOLOv8 Pipeline',
    description='Yolo v8 pipeline'
)
def yolo_pipeline():
    config = {
        'minio_endpoint': 'minio-service.kubeflow:9000',
        'minio_access_key': 'minio',
        'minio_secret_key': 'minio123',
        'minio_bucket': 'mlpipeline'
    }

    train_task = train(**config)
    train_task.set_cpu_request('2')
    train_task.set_memory_request('4G')

    package_task = package_to_mar(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config
    )
    package_task.after(train_task)

    eval_task = evaluate_model(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config
    )
    eval_task.after(package_task)  

    inference_task = inference_model(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config
    )
    inference_task.after(eval_task)



if __name__ == "__main__":
    from kfp import compiler
    pipeline_package_path = "yolo_pipeline.yaml"
    compiler.Compiler().compile(
        pipeline_func=yolo_pipeline,
        package_path=pipeline_package_path
    )
    print(f"Kompilacja OK -> {pipeline_package_path}")


    from kfp import Client
    client = Client()  
    run = client.create_run_from_pipeline_package(
        pipeline_file=pipeline_package_path,
        arguments={}
    )
    print("Pipeline run submitted:", run.run_id)

Kompilacja OK -> yolo_pipeline.yaml




Pipeline run submitted: 6d6c7048-d4c6-48ba-9cff-9d22a1adaeb5
