In [1]:
!pip install kfp kubernetes minio



In [None]:
import kfp
import os
from minio import Minio
from kfp import dsl
from kfp.dsl import Input, Output, Artifact, Model, component

In [3]:
@dsl.component(
    base_image='lukoprych/yolov8-pipeline-base:latest',
    packages_to_install=[]
)
def train(
    trained_model_output: Output[Artifact],
    training_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str,
    dataset_path: str = "dataset",
    base_model: str = "yolov8n.pt"
):
    import os
    import subprocess
    import tempfile
    from pathlib import Path
    import yaml
    import shutil
    from ultralytics import YOLO
    from minio import Minio

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        dataset_dir = temp_dir / "dataset"
        model_dir = temp_dir / "model"
        log_path = temp_dir / "training_logs.txt"
        tensorboard_dir = temp_dir / "tensorboard"
        tensorboard_dir.mkdir(exist_ok=True)

        for subdir in ['images/train', 'images/val', 'labels/train', 'labels/val']:
            (dataset_dir / subdir).mkdir(parents=True, exist_ok=True)

        objects = client.list_objects(minio_bucket, prefix=dataset_path, recursive=True)
        for obj in objects:
            if obj.object_name.endswith('/'):
                continue
            rel_path = obj.object_name[len(dataset_path):].lstrip('/')
            if not rel_path:
                continue
            local_path = dataset_dir / rel_path
            local_path.parent.mkdir(parents=True, exist_ok=True)
            client.fget_object(minio_bucket, obj.object_name, str(local_path))

        yaml_path = dataset_dir / "data.yaml"
        if yaml_path.exists():
            with open(yaml_path, 'r') as f:
                data_config = yaml.safe_load(f)
            data_config['train'] = str(dataset_dir/'images/train')
            data_config['val'] = str(dataset_dir/'images/val')
            with open(yaml_path, 'w') as f:
                yaml.dump(data_config, f)

        base_model_local = model_dir / base_model
        client.fget_object(minio_bucket, f"model/{base_model}", str(base_model_local))

        model = YOLO(str(base_model_local))
        results = model.train(
            data=str(yaml_path),
            epochs=5,
            imgsz=640,
            batch=4,
            patience=3,
            device='cpu',
            project=str(tensorboard_dir),  
            name='',  
            exist_ok=True,
            plots=True,  
            verbose=True
        )

        trained_model_local = model_dir / "trained_model.pt"
        model.save(str(trained_model_local))

        client.fput_object(
            minio_bucket,
            "model/trained_yolo_model.pt",
            str(trained_model_local)
        )

        with open(log_path, 'w') as f:
            f.write("=== TRAINING COMPLETE ===\n")
            f.write(str(results) + "\n")

        with open(trained_model_output.path, 'wb') as out_f:
            out_f.write(open(trained_model_local, 'rb').read())

        with open(training_logs_output.path, 'w') as log_art:
            log_art.write(open(log_path, 'r').read())

        # Upload TensorBoard files to MinIO
        for file_path in tensorboard_dir.rglob('*'):
            if file_path.is_file():
                rel_path = file_path.relative_to(tensorboard_dir)
                minio_key = f"tensorboard/{rel_path}"
                print(f"Uploading TensorBoard file: {minio_key}")
                client.fput_object(
                    minio_bucket,
                    minio_key,
                    str(file_path)
                )

In [4]:
@dsl.component(
    base_image='lukoprych/yolov8-pipeline-base:latest',
    packages_to_install=['torch-model-archiver', 'torchserve']  
)
def package_to_mar(
    trained_model_input: Input[Artifact],
    trained_mar_output: Output[Artifact],
    base_mar_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str,
    base_model_path: str = "model/yolov8n.pt"
):
    import tempfile
    from pathlib import Path
    from minio import Minio
    import subprocess
    import os

    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        trained_model_path = temp_dir / "trained_yolo_model.pt"
        base_model_path_local = temp_dir / "yolov8n.pt"
        model_store = temp_dir / "model-store"
        model_store.mkdir(exist_ok=True)
        handler_path = temp_dir / "yolo_handler.py"
        
        # Write handler
        with open(handler_path, "w") as f:
            f.write("""import logging
import os
from collections import Counter
import torch
from torchvision import transforms
from ultralytics import YOLO
from ts.torch_handler.object_detector import ObjectDetector
import io
import base64
from PIL import Image

logger = logging.getLogger(__name__)

try:
    import torch_xla.core.xla_model as xm
    XLA_AVAILABLE = True
except ImportError as error:
    XLA_AVAILABLE = False

class Yolov8Handler(ObjectDetector):
    image_processing = transforms.Compose([
        transforms.Resize(640),
        transforms.CenterCrop(640),
        transforms.ToTensor()
    ])

    def __init__(self):
        super(Yolov8Handler, self).__init__()

    def initialize(self, context):
        if torch.cuda.is_available():
            self.device = torch.device("cuda")
        elif XLA_AVAILABLE:
            self.device = xm.xla_device()
        else:
            self.device = torch.device("cpu")

        properties = context.system_properties
        self.manifest = context.manifest
        model_dir = properties.get("model_dir")
        self.model_pt_path = None
        if "serializedFile" in self.manifest["model"]:
            serialized_file = self.manifest["model"]["serializedFile"]
            self.model_pt_path = os.path.join(model_dir, serialized_file)
        self.model = self._load_torchscript_model(self.model_pt_path)
        logger.debug("Model file %s loaded successfully", self.model_pt_path)
        self.initialized = True

    def _load_torchscript_model(self, model_pt_path):
        model = YOLO(model_pt_path)
        model.to(self.device)
        return model

    def preprocess(self, requests):
        images = []
        for data in requests:
            image = data.get("data") or data.get("body")
            if isinstance(image, str):
                image = base64.b64decode(image)
            image = Image.open(io.BytesIO(image)).convert('RGB')
            image = self.image_processing(image)
            images.append(image)
        return torch.stack(images).to(self.device)

    def inference(self, data):
        results = self.model(data)
        return results

    def postprocess(self, res):
        output = []
        for data in res:
            classes = data.boxes.cls.tolist()
            names = data.names
            classes = map(lambda cls: names[int(cls)], classes)
            result = Counter(classes)
            output.append(dict(result))
        return output""")

        # Create requirements file
        requirements_path = temp_dir / "requirements.txt"
        with open(requirements_path, "w") as f:
            f.write("torch\nPillow\nultralytics\ntorchvision\n")

        # Copy trained model
        print(f"Copying trained model to {trained_model_path}")
        with open(trained_model_input.path, 'rb') as in_f:
            with open(trained_model_path, 'wb') as out_f:
                out_f.write(in_f.read())
        
        if not trained_model_path.exists():
            raise FileNotFoundError(f"Trained model was not copied to {trained_model_path}")
        
        # Get base model
        print(f"Getting base model from MinIO")
        client.fget_object(
            minio_bucket,
            base_model_path,
            str(base_model_path_local)
        )
        
        if not base_model_path_local.exists():
            raise FileNotFoundError(f"Base model was not downloaded to {base_model_path_local}")
        
        # Package trained model
        print("Creating trained model MAR file...")
        result = subprocess.run([
            'torch-model-archiver',
            '--model-name', 'trained_yolo_model', 
            '--version', '1.0',
            '--serialized-file', str(trained_model_path),
            '--handler', str(handler_path),
            '--requirements-file', str(requirements_path),
            '--export-path', str(model_store),
            '--force'
        ], capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"Error creating trained MAR file: {result.stderr}")
        
        # Package base model
        print("Creating base model MAR file...")
        result = subprocess.run([
            'torch-model-archiver',
            '--model-name', 'base_yolo_model', 
            '--version', '1.0',
            '--serialized-file', str(base_model_path_local),
            '--handler', str(handler_path),
            '--requirements-file', str(requirements_path),
            '--export-path', str(model_store),
            '--force'
        ], capture_output=True, text=True)
        
        if result.returncode != 0:
            raise Exception(f"Error creating base MAR file: {result.stderr}")
            
        trained_mar_path = model_store / "trained_yolo_model.mar"
        base_mar_path = model_store / "base_yolo_model.mar"
        
        if not trained_mar_path.exists() or not base_mar_path.exists():
            raise FileNotFoundError(f"MAR files were not created: {trained_mar_path}, {base_mar_path}")
            
        print(f"MAR files created at: {trained_mar_path} and {base_mar_path}")
        
        # Save to MinIO
        print("Saving to MinIO...")
        client.fput_object(
            minio_bucket,
            "kserve/model-store/trained_yolo_model.mar",
            str(trained_mar_path)
        )
        client.fput_object(
            minio_bucket,
            "kserve/model-store/base_yolo_model.mar",
            str(base_mar_path)
        )
        
        # Save outputs
        print("Saving outputs...")
        with open(trained_mar_output.path, 'wb') as out_f:
            out_f.write(open(trained_mar_path, 'rb').read())
        with open(base_mar_output.path, 'wb') as out_f:
            out_f.write(open(base_mar_path, 'rb').read())

In [5]:
@dsl.component(
    base_image='lukoprych/yolov8-pipeline-base:latest',
    packages_to_install=[] 
)
def evaluate_model(
    trained_model_input: Input[Artifact],
    evaluation_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str
):
    import tempfile
    from pathlib import Path
    from ultralytics import YOLO
    from minio import Minio
    from datetime import datetime
    import os
    
    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        model_local_path = temp_dir / "trained_model.pt"
        eval_log_path = temp_dir / "eval_logs.txt"
        
        with open(trained_model_input.path, 'rb') as in_f:
            with open(model_local_path, 'wb') as out_f:
                out_f.write(in_f.read())
        
        image_paths = []
        for img_name in ["zidane.jpg", "bus.jpg"]:
            image_local = temp_dir / img_name
            try:
                client.fget_object(
                    minio_bucket,
                    f"images/{img_name}",
                    str(image_local)
                )
                image_paths.append(image_local)
            except Exception as e:
                print(f"Warning: Could not download {img_name}: {e}")
                # Try to download from ultralytics
                try:
                    from ultralytics.utils.downloads import download
                    download(f"https://ultralytics.com/images/{img_name}", str(image_local))
                    image_paths.append(image_local)
                except Exception as e2:
                    print(f"Could not download image from backup source: {e2}")
        
        model = YOLO(str(model_local_path))
        
        with open(eval_log_path, 'w') as f:
            f.write("=== EVALUATION RESULTS ===\n")
            for img_path in image_paths:
                f.write(f"\nEvaluating image: {img_path.name}\n")
                results = model(str(img_path))
                f.write(f"Detection results: {results[0].boxes.shape[0]} objects found\n")
                
                # Detailed metrics
                boxes = results[0].boxes
                if len(boxes) > 0:
                    f.write(f"Average confidence: {boxes.conf.mean().item():.4f}\n")
                    f.write(f"Maximum confidence: {boxes.conf.max().item():.4f}\n")
                    
                    # Class distribution
                    classes = boxes.cls.tolist()
                    names = results[0].names
                    class_counts = {}
                    for cls in classes:
                        cls_name = names[int(cls)]
                        class_counts[cls_name] = class_counts.get(cls_name, 0) + 1
                    
                    f.write("Class distribution:\n")
                    for cls_name, count in class_counts.items():
                        f.write(f"  - {cls_name}: {count}\n")
                        
                f.write(f"Results: {results}\n")
                f.write("-" * 40 + "\n")
        
        # Save logs to output artifact
        with open(evaluation_logs_output.path, 'w') as log_art:
            log_art.write(open(eval_log_path, 'r').read())

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        log_filename = f"logs/eval_logs_{timestamp}.txt"

        # Also save to MinIO with specific name
        client.put_object(
            minio_bucket,
            log_filename,
            open(eval_log_path, 'rb'),
            length=os.path.getsize(str(eval_log_path))
        )
        

In [6]:
@dsl.component(
    base_image='lukoprych/yolov8-pipeline-base:latest',
    packages_to_install=[]
)
def inference_model(
    trained_model_input: Input[Artifact],
    inference_logs_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str
):
    import os
    import tempfile
    import time
    from pathlib import Path
    from ultralytics import YOLO
    from minio import Minio
    from datetime import datetime
    
    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        model_local_path = temp_dir / "trained_model.pt"
        inference_log_path = temp_dir / "inference_logs.txt"
        result_images_dir = temp_dir / "results"
        result_images_dir.mkdir(exist_ok=True)
        
        with open(trained_model_input.path, 'rb') as in_f:
            with open(model_local_path, 'wb') as out_f:
                out_f.write(in_f.read())
        
        # Get test images
        images = ["zidane.jpg", "bus.jpg"]
        local_img_paths = []
        
        for img in images:
            local_img = temp_dir / img
            try:
                client.fget_object(
                    minio_bucket,
                    f"images/{img}",
                    str(local_img)
                )
                local_img_paths.append(local_img)
            except Exception as e:
                print(f"Warning: Could not download {img}: {e}")
                # Try to download from ultralytics
                try:
                    from ultralytics.utils.downloads import download
                    download(f"https://ultralytics.com/images/{img}", str(local_img))
                    local_img_paths.append(local_img)
                except Exception as e2:
                    print(f"Could not download image from backup source: {e2}")
        
        model = YOLO(str(model_local_path))
        timestamp = int(time.time())
        
        with open(inference_log_path, 'w') as f:
            f.write("=== INFERENCE RESULTS ===\n")
            f.write(f"Timestamp: {timestamp}\n")
            f.write(f"Model: {model_local_path}\n\n")
            
            for img_path in local_img_paths:
                f.write(f"Image: {img_path.name}\n")
                # Run inference with visualization
                results = model(str(img_path), save=True, save_dir=str(result_images_dir))
                
                # Save detailed results
                boxes = results[0].boxes
                f.write(f"Detection count: {len(boxes)}\n")
                
                if len(boxes) > 0:
                    f.write(f"Average confidence: {boxes.conf.mean().item():.4f}\n")
                    f.write(f"Classes detected: {len(set(boxes.cls.tolist()))}\n")
                    
                    # Class distribution with confidence
                    classes = boxes.cls.tolist()
                    confs = boxes.conf.tolist()
                    names = results[0].names
                    
                    f.write("Detections:\n")
                    for i, (cls, conf) in enumerate(zip(classes, confs)):
                        cls_name = names[int(cls)]
                        f.write(f"  {i+1}. {cls_name} (confidence: {conf:.4f})\n")
                
                f.write(f"Raw results: {results}\n")
                f.write("-" * 60 + "\n\n")
                
                # Upload result images to MinIO if they exist
                result_img = result_images_dir / img_path.name
                if result_img.exists():
                    try:
                        client.fput_object(
                            minio_bucket,
                            f"inference/results/{timestamp}_{img_path.name}",
                            str(result_img)
                        )
                    except Exception as e:
                        f.write(f"Failed to upload result image: {e}\n")
        
        # Save to artifact output
        with open(inference_logs_output.path, 'w') as log_art:
            log_art.write(open(inference_log_path, 'r').read())

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        log_filename = f"logs/inference_logs_{timestamp}.txt"
        
        # Also save to MinIO with specific name
        client.put_object(
            minio_bucket,
            log_filename,
            open(inference_log_path, 'rb'),
            length=os.path.getsize(str(inference_log_path))
        )

In [None]:
@dsl.component(
    base_image='lukoprych/yolov8-pipeline-base:latest',
    packages_to_install=[]
)
def compare_models(
    trained_model_input: Input[Artifact],
    model_comparison_output: Output[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str,
    base_model_path: str = "model/yolov8n.pt",
    previous_trained_model_path: str = "model/trained_yolo_model.pt",
    accuracy_threshold: float = 0.03  # 3% improvement required
):
    
    import os
    import tempfile
    import json
    from pathlib import Path
    from ultralytics import YOLO
    from minio import Minio
    import time
    
    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        new_model_path = temp_dir / "new_model.pt"
        prev_model_path = temp_dir / "previous_model.pt"
        base_model_local_path = temp_dir / "base_model.pt"
        validation_images_dir = temp_dir / "validation_images"
        validation_images_dir.mkdir(exist_ok=True)
        comparison_results_path = temp_dir / "comparison_results.json"
        
        # Copy the new model from input
        with open(trained_model_input.path, 'rb') as in_f:
            with open(new_model_path, 'wb') as out_f:
                out_f.write(in_f.read())
        
        # Check if this is the first training run or if previous trained model exists
        previous_model_exists = True
        try:
            # Use timestamp to check if the previous trained model is not the same as current one
            prev_stats = client.stat_object(minio_bucket, previous_trained_model_path)
            curr_time = time.time()
            # If the model was created in the last hour, it's likely from the current run
            # In that case, compare against the base model
            if (curr_time - prev_stats.last_modified.timestamp()) < 3600:
                print("Previous trained model is likely from current run, comparing against base model")
                client.fget_object(
                    minio_bucket,
                    base_model_path,
                    str(prev_model_path)
                )
            else:
                client.fget_object(
                    minio_bucket,
                    previous_trained_model_path,
                    str(prev_model_path)
                )
        except Exception as e:
            print(f"Previous trained model not found: {e}")
            print("Will compare against base model instead")
            try:
                client.fget_object(
                    minio_bucket,
                    base_model_path,
                    str(prev_model_path)
                )
            except Exception as e2:
                print(f"Base model not found either: {e2}")
                previous_model_exists = False
        
        # Get validation images
        val_images = ["zidane.jpg", "bus.jpg"]
        for img in val_images:
            local_img = validation_images_dir / img
            try:
                client.fget_object(
                    minio_bucket,
                    f"images/{img}",
                    str(local_img)
                )
            except Exception as e:
                print(f"Warning: Could not fetch validation image {img}: {e}")
                # Try to use demo images from ultralytics
                import shutil
                from ultralytics.utils.downloads import download
                download(f"https://ultralytics.com/images/{img}", str(local_img))
        
        # Load new model
        new_model = YOLO(str(new_model_path))
        
        # Evaluate new model
        new_model_metrics = {}
        for img_name in val_images:
            img_path = validation_images_dir / img_name
            if not img_path.exists():
                print(f"Skipping missing image: {img_name}")
                continue
                
            results = new_model(str(img_path))
            # Extract confidence scores and metrics
            boxes = results[0].boxes
            new_model_metrics[img_name] = {
                'num_detections': len(boxes),
                'avg_confidence': float(boxes.conf.mean()) if len(boxes) > 0 else 0,
                'max_confidence': float(boxes.conf.max()) if len(boxes) > 0 else 0
            }
        
        comparison_result = {
            'new_model_metrics': new_model_metrics,
            'previous_model_metrics': {},
            'is_better': True,  
            'improvement': 0,
            'message': 'First model, no comparison available'
        }
        
        if previous_model_exists:
            # Load previous model
            prev_model = YOLO(str(prev_model_path))
            
            # Evaluate previous model
            prev_model_metrics = {}
            for img_name in val_images:
                img_path = validation_images_dir / img_name
                if not img_path.exists():
                    continue
                    
                results = prev_model(str(img_path))
                boxes = results[0].boxes
                prev_model_metrics[img_name] = {
                    'num_detections': len(boxes),
                    'avg_confidence': float(boxes.conf.mean()) if len(boxes) > 0 else 0,
                    'max_confidence': float(boxes.conf.max()) if len(boxes) > 0 else 0
                }
            
            comparison_result['previous_model_metrics'] = prev_model_metrics
            
            # Calculate metrics:
            # 1. Average confidence across all images
            # 2. Total detections (weighted by confidence)
            
            # Get valid images that both models processed
            valid_images = set(new_model_metrics.keys()) & set(prev_model_metrics.keys())
            if not valid_images:
                comparison_result['message'] = "No common images to compare models"
                comparison_result['is_better'] = True
            else:
                # Calculate weighted detection score (num_detections * avg_confidence)
                new_score = sum(
                    new_model_metrics[img]['num_detections'] * new_model_metrics[img]['avg_confidence']
                    for img in valid_images
                ) / len(valid_images)
                
                prev_score = sum(
                    prev_model_metrics[img]['num_detections'] * prev_model_metrics[img]['avg_confidence']
                    for img in valid_images
                ) / len(valid_images)
                
                improvement = (new_score - prev_score) / max(prev_score, 0.001)
                
                comparison_result['improvement'] = float(improvement)
                comparison_result['is_better'] = improvement >= accuracy_threshold
                
                if comparison_result['is_better']:
                    comparison_result['message'] = f"New model is better by {improvement:.2%}"
                else:
                    comparison_result['message'] = (
                        f"New model does not meet improvement threshold. "
                        f"Improvement: {improvement:.2%}, Required: {accuracy_threshold:.2%}"
                    )
        
        # Save comparison results
        with open(comparison_results_path, 'w') as f:
            json.dump(comparison_result, f, indent=2)
        
        # Save to MinIO as well
        client.put_object(
            minio_bucket,
            "comparison/model_comparison.json",
            open(comparison_results_path, 'rb'),
            length=os.path.getsize(str(comparison_results_path))
        )
        
        with open(model_comparison_output.path, 'w') as out_f:
            out_f.write(open(comparison_results_path, 'r').read())

        print(f"Comparison complete: {comparison_result['message']}")
        print(f"Model is better: {comparison_result['is_better']}")

In [8]:
@dsl.component(
    base_image='python:3.9',
    packages_to_install=['kubernetes', 'PyYAML', 'minio']
)
def serve(
    trained_mar_input: Input[Artifact],
    base_mar_input: Input[Artifact],
    model_comparison_input: Input[Artifact],
    minio_endpoint: str,
    minio_access_key: str,
    minio_secret_key: str,
    minio_bucket: str,
    service_namespace: str = "kubeflow-user-example-com"
):
    import kubernetes
    import yaml
    import os
    import json
    from minio import Minio
    import tempfile
    from pathlib import Path
    import io
    
    # Connect to MinIO
    client = Minio(
        minio_endpoint,
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False
    )
    
    # Read comparison results to determine which model to serve
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir = Path(temp_dir)
        comparison_path = temp_dir / "comparison_results.json"
        deploy_log_path = temp_dir / "deployment_logs.txt"
        model_path = temp_dir / "model.mar"
        
        with open(model_comparison_input.path, 'r') as in_f:
            with open(comparison_path, 'w') as out_f:
                out_f.write(in_f.read())
                
        with open(comparison_path, 'r') as f:
            comparison_results = json.load(f)
        
        with open(deploy_log_path, 'w') as log_f:
            log_f.write("=== DEPLOYMENT LOGS ===\n")
            
            if comparison_results.get('is_better', True):
                log_f.write("Selecting trained model - it performs better\n")
                log_f.write(f"Improvement: {comparison_results.get('improvement', 0):.2%}\n")
                # Copy trained model to local path
                with open(trained_mar_input.path, 'rb') as src, open(model_path, 'wb') as dst:
                    dst.write(src.read())
            else:
                log_f.write("Selecting base model - trained model did not show sufficient improvement\n")
                # Copy base model to local path
                with open(base_mar_input.path, 'rb') as src, open(model_path, 'wb') as dst:
                    dst.write(src.read())
            
            log_f.write(f"Reason: {comparison_results.get('message', 'No comparison data available')}\n")
            
            # Upload selected model to MinIO using the file path
            try:
                client.fput_object(
                    minio_bucket,
                    "kserve/model-store/yolo_model.mar",
                    str(model_path)
                )
                log_f.write("Successfully uploaded model to MinIO\n")
            except Exception as e:
                log_f.write(f"Error uploading model to MinIO: {str(e)}\n")
                raise
        
        # Upload deployment logs to MinIO
        try:
            client.fput_object(
                minio_bucket,
                "deployment/deployment_logs.txt",
                str(deploy_log_path)
            )
        except Exception as e:
            print(f"Error uploading deployment logs: {str(e)}")
    
    # Configure Kubernetes
    kubernetes.config.load_incluster_config()
    api_instance = kubernetes.client.CustomObjectsApi()
    
    # Use fixed service definition with correct storageUri path
    inference_service = {
        "apiVersion": "serving.kserve.io/v1beta1",
        "kind": "InferenceService",
        "metadata": {
            "name": "yolov8",
            "namespace": service_namespace
        },
        "spec": {
            "predictor": {
                "serviceAccountName": "sa-minio-kserve",
                "model": {
                    "modelFormat": {
                        "name": "pytorch"
                    },
                    "storageUri": "s3://mlpipeline/kserve",
                    "protocolVersion": "v2"
                }
            }
        }
    }
    
    print(f"Deploying InferenceService 'yolov8' in namespace {service_namespace}")
    print(f"Using storageUri: s3://mlpipeline/kserve")
    
    try:
        response = api_instance.create_namespaced_custom_object(
            group="serving.kserve.io",
            version="v1beta1",
            namespace=service_namespace,
            plural="inferenceservices",
            body=inference_service
        )
        print(f"InferenceService created: {response['metadata']['name']}")
    except kubernetes.client.rest.ApiException as e:
        if e.status == 409:
            print("InferenceService already exists, updating")
            response = api_instance.replace_namespaced_custom_object(
                group="serving.kserve.io",
                version="v1beta1",
                namespace=service_namespace,
                plural="inferenceservices",
                name="yolov8",
                body=inference_service
            )
            print(f"InferenceService updated: {response['metadata']['name']}")
        else:
            print(f"Error creating/updating InferenceService: {e}")
            raise
    
    print("InferenceService has been deployed. Check its status manually.")
    print(f"Service will be available at: yolov8.{service_namespace}.example.com")

In [9]:
@dsl.pipeline(
    name='YOLOv8 Pipeline',
    description='YOLOv8 pipeline'
)
def yolo_pipeline():
    config = {
        'minio_endpoint': 'minio-service.kubeflow:9000',
        'minio_access_key': 'minio',
        'minio_secret_key': 'minio123',
        'minio_bucket': 'mlpipeline'
    }
    
    # Training step with TensorBoard
    train_task = train(
        **config,
        base_model="yolov8n.pt",
        #dataset_path="dataset"
    ).set_display_name("train")
    
    train_task.set_cpu_request('2')
    train_task.set_memory_request('4G')
    
    # Package both models to MAR
    package_task = package_to_mar(
        trained_model_input=train_task.outputs["trained_model_output"],
        base_model_path="model/yolov8n.pt",
        **config
    )
    package_task.after(train_task)
    
    # Evaluation step
    eval_task = evaluate_model(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config
    )
    eval_task.after(package_task)
    
    # Inference step
    inference_task = inference_model(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config
    )
    inference_task.after(eval_task)
    
    # Model comparison quality gate
    compare_task = compare_models(
        trained_model_input=train_task.outputs["trained_model_output"],
        **config,
        base_model_path="model/yolov8n.pt",
        previous_trained_model_path="model/trained_yolo_model.pt",
        accuracy_threshold=0.03  # Require 3% improvement
    )
    compare_task.after(inference_task)
    
    # Conditionally serve the better model
    serve_task = serve(
        trained_mar_input=package_task.outputs["trained_mar_output"],
        base_mar_input=package_task.outputs["base_mar_output"],
        model_comparison_input=compare_task.outputs["model_comparison_output"],
        **config,
        service_namespace="kubeflow-user-example-com"
    )
    serve_task.after(compare_task)

if __name__ == "__main__":
    from kfp import compiler
    pipeline_package_path = "yolo_pipeline.yaml"
    compiler.Compiler().compile(
        pipeline_func=yolo_pipeline,
        package_path=pipeline_package_path
    )
    print(f"Compilation successful -> {pipeline_package_path}")
    
    from kfp import Client
    client = Client()
    run = client.create_run_from_pipeline_package(
        pipeline_file=pipeline_package_path,
        arguments={}
    )
    print("Pipeline run submitted:", run.run_id)

Compilation successful -> yolo_pipeline.yaml




Pipeline run submitted: 5b9728c1-fbc9-43bf-a138-d69433fdbab9
