In [None]:
import os, zipfile, shutil, random
from glob import glob

from ultralytics import YOLO
from sklearn.model_selection import StratifiedKFold
import numpy as np

import torch, gc

In [None]:
# THIS PART BELOW CAN BE CHANGED
IMAGE_ROOT  = "./datasets/images"
LABEL_ROOT  = "./datasets/labels"
OUTPUT_ROOT = "./classification_dataset"
CLASS_NAMES = ["negative", "positive"]
MODEL_PATH  = "yolo11n-cls.pt"

EPOCHS      = 20
BATCH       = 16
FOLDS_NUM   = 5
RANDOM_SEED = 42
# END OF CHANGEABLE PART
random.seed(RANDOM_SEED)

## Prepare dataset

In [None]:
image_files = sorted(glob(os.path.join(IMAGE_ROOT, "*.png")))
print(f"Image: {len(image_files)}")

classified = []
labels_only = []

for img in image_files:
    label_path = os.path.join(LABEL_ROOT, os.path.basename(img).rsplit(".", 1)[0] + ".txt")
    if not os.path.exists(label_path):
        continue
    with open(label_path, "r") as f:
        content = f.read().strip()
    label = "positive" if content else "negative"
    classified.append((img, label))
    labels_only.append(1 if label == "positive" else 0)
    
positive_count = sum(1 for _, l in classified if l == 'positive')
negative_count = sum(1 for _, l in classified if l == 'negative')

all_data_root = os.path.join(OUTPUT_ROOT, "all_data")
for cls in CLASS_NAMES:
    os.makedirs(os.path.join(all_data_root, cls), exist_ok=True)
    
for img, label in classified:
    dst = os.path.join(all_data_root, label, os.path.basename(img))
    shutil.copy2(img, dst)

skf = StratifiedKFold(
    n_splits     = FOLDS_NUM, 
    shuffle      = True, 
    random_state = RANDOM_SEED
)
fold_splits = []

for fold, (train_idx, val_idx) in enumerate(skf.split(classified, labels_only), 1):
    train_files = [classified[i] for i in train_idx]
    val_files = [classified[i] for i in val_idx]
    
    fold_splits.append({
        'fold': fold,
        'train': train_files,
        'val': val_files
    })
    
fold_results = []

for split in fold_splits:
    fold = split['fold']
    fold_output = os.path.join(OUTPUT_ROOT, f"fold_{fold}")
    
    for subset in ["train", "val"]:
        for cls in CLASS_NAMES:
            os.makedirs(os.path.join(fold_output, subset, cls), exist_ok=True)

    for img, label in split['train']:
        src = os.path.join(all_data_root, label, os.path.basename(img))
        dst = os.path.join(fold_output, "train", label, os.path.basename(img))
        shutil.copy2(src, dst)
        
    for img, label in split['val']:
        src = os.path.join(all_data_root, label, os.path.basename(img))
        dst = os.path.join(fold_output, "val", label, os.path.basename(img))
        shutil.copy2(src, dst)

## Training

In [None]:
for split in fold_splits:
    fold = split['fold']
    fold_output = os.path.join(OUTPUT_ROOT, f"fold_{fold}")
    for subset in ["train", "val"]:
        for cls in CLASS_NAMES:
            folder = os.path.join(fold_output, subset, cls)
            files = os.listdir(folder)
            valid_count = 0
            for f in files[:3]:
                full_path = os.path.join(folder, f)
                if os.path.exists(full_path):
                    valid_count += 1
    
    model = YOLO(MODEL_PATH)
    results = model.train(
        data    = fold_output,
        epochs  = EPOCHS,
        imgsz   = 640,
        batch   = BATCH,
        amp     = False,
        name    = f"aortic_valve_cls_FOLD_{fold}",
        exist_ok= True,
        workers = 6
    )
    # FREE GARBAGE COLLECTION, TO AVOID Out-of-Memory
    # DO NOT REMOVE
    import torch, gc
    del results, model
    gc.collect()
    torch.cuda.empty_cache()
    # DO NOT REMOVE

## Filter the dataset

In [None]:
# THIS PART BELOW CAN BE CHANGED

MODEL_PATHS = [
    f"runs/classify/aortic_valve_cls_FOLD_{i}/weights/best.pt" for i in range(1, 6)
]

TEST_IMAGE_ROOT = "./datasets/testing"
OUTPUT_ROOT = "./test_results_kfold_5fold_new"

CONFIDENCE_THRESHOLD = 0.995

CLASS_NAMES = ["negative", "positive"]
# END OF CHANGEABLE PART

models = []
for i, model_path in enumerate(MODEL_PATHS, 1):
    model = YOLO(model_path)
    models.append(model)

image_files = sorted(glob(os.path.join(TEST_IMAGE_ROOT, "*.png")))

os.makedirs(OUTPUT_ROOT, exist_ok=True)
for cls in CLASS_NAMES:
    os.makedirs(os.path.join(OUTPUT_ROOT, cls), exist_ok=True)

for img_idx, img_path in enumerate(image_files, 1):
    img_name = os.path.basename(img_path)
    
    all_probs = []
    all_predictions = []

    for model in models:
        results = model(img_path, verbose=False)
        result = results[0]
        probs = result.probs

        prob_array = probs.data.cpu().numpy()
        all_probs.append(prob_array)

        top1_idx = probs.top1
        all_predictions.append(top1_idx)

    all_probs = np.array(all_probs)
    
    ensemble_probs = np.mean(all_probs, axis=0)
    predicted_idx = np.argmax(ensemble_probs)
    ensemble_confidence = ensemble_probs[predicted_idx]
    
    predicted_class = CLASS_NAMES[predicted_idx]
    output_class = predicted_class
    
    if ensemble_confidence < CONFIDENCE_THRESHOLD:
        output_class = "positive"
        
    dst_path = os.path.join(OUTPUT_ROOT, output_class, img_name)
    shutil.copy2(img_path, dst_path)
    
import torch, gc
del results, model
gc.collect()
torch.cuda.empty_cache()

## Remove Garbage Collection

In [None]:
import torch, gc
# FREE GARBAGE COLLECTION, TO AVOID Out-of-Memory
# DO NOT REMOVE
gc.collect()
torch.cuda.empty_cache()
# DO NOT REMOVE