In [None]:
# ============================================================
# CELL 1: Setup & Imports
# ============================================================
!pip install -q ultralytics

import os
import shutil
import cv2
import numpy as np
import torch
import torch.nn as nn
import pandas as pd
from pathlib import Path
from datetime import datetime

os.environ["WANDB_DISABLED"] = "true"

IS_KAGGLE = os.path.exists('/kaggle/input')
print(f"Running on: {'Kaggle' if IS_KAGGLE else 'Local'}")
print(f"PyTorch: {torch.__version__}")
print(f"CUDA: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# ============================================================
# CELL 2: Dataset Path Setup
# ============================================================
if IS_KAGGLE:
    DATASET_DIR = Path("/kaggle/input/ffb-localization-rgbd-dataset/ffb_localization_rgbd")
    WORK_DIR = Path("/kaggle/working/ffb_localization_rgbd")
    BASE_PATH = Path("/kaggle/working")
else:
    BASE_PATH = Path(r"D:/Work/Assisten Dosen/Anylabel/Experiments")
    DATASET_DIR = BASE_PATH / "datasets" / "ffb_localization_rgbd"
    WORK_DIR = BASE_PATH / "working" / "ffb_localization_rgbd"

RUNS_PATH = BASE_PATH / 'runs' / 'detect'
KAGGLE_OUTPUT = BASE_PATH / 'kaggleoutput'
KAGGLE_OUTPUT.mkdir(parents=True, exist_ok=True)

print(f"Dataset Dir: {DATASET_DIR}")
print(f"Work Dir: {WORK_DIR}")

In [None]:
# ============================================================
# CELL 3: Create Pre-merged RGBD 4-Channel PNG Images
# ============================================================
def create_rgbd_dataset(root: Path, work_root: Path) -> None:
    splits = ("train", "val", "test")
    
    for split in splits:
        rgb_dir = root / "rgb" / split
        depth_dir = root / "depth" / split
        label_dir = root / "labels" / split
        
        if not all([rgb_dir.exists(), depth_dir.exists(), label_dir.exists()]):
            print(f"Warning: Missing folders for {split}")
            continue
        
        rgb_files = {p.name for p in rgb_dir.glob("*.png")}
        depth_files = {p.name for p in depth_dir.glob("*.png")}
        label_files = {p.with_suffix(".png").name for p in label_dir.glob("*.txt")}
        
        keep = rgb_files & depth_files & label_files
        print(f"{split}: Keep={len(keep)}")
        
        images_dir = work_root / "images" / split
        images_dir.mkdir(parents=True, exist_ok=True)
        
        labels_out_dir = work_root / "labels" / split
        labels_out_dir.mkdir(parents=True, exist_ok=True)
        
        for fname in keep:
            dst_rgbd = images_dir / fname
            
            if not dst_rgbd.exists():
                rgb = cv2.imread(str(rgb_dir / fname), cv2.IMREAD_COLOR)
                depth = cv2.imread(str(depth_dir / fname), cv2.IMREAD_GRAYSCALE)
                
                if rgb is None or depth is None:
                    continue
                
                if depth.shape[:2] != rgb.shape[:2]:
                    depth = cv2.resize(depth, (rgb.shape[1], rgb.shape[0]))
                
                rgbd = np.dstack([rgb, depth])
                cv2.imwrite(str(dst_rgbd), rgbd)
            
            src_label = label_dir / fname.replace(".png", ".txt")
            dst_label = labels_out_dir / fname.replace(".png", ".txt")
            if not dst_label.exists():
                shutil.copy2(src_label, dst_label)
        
        sample = list(images_dir.glob("*.png"))[0]
        test_img = cv2.imread(str(sample), cv2.IMREAD_UNCHANGED)
        print(f"  -> shape: {test_img.shape}")

create_rgbd_dataset(DATASET_DIR, WORK_DIR)

for p in WORK_DIR.rglob("*.cache"):
    try:
        p.unlink()
    except:
        pass
print("Done!")

In [None]:
# ============================================================
# CELL 4: Monkey-Patch Ultralytics imread
# ============================================================
import ultralytics.utils.patches as patches

_original_imread = patches.imread

def imread_4ch(filename, flags=cv2.IMREAD_UNCHANGED):
    return _original_imread(filename, cv2.IMREAD_UNCHANGED)

patches.imread = imread_4ch
print("✅ Patched imread")

from ultralytics import YOLO
print("✅ YOLO imported")

In [None]:
# ============================================================
# CELL 5: Create Dataset Config YAML dengan channels: 4
# ============================================================
config_content = f"""path: {WORK_DIR}
train: images/train
val: images/val
test: images/test

nc: 1
channels: 4

names:
  0: ffb
"""

config_path = WORK_DIR / "dataset_rgbd.yaml"
with open(config_path, 'w') as f:
    f.write(config_content)
print(f"✅ Config saved: {config_path}")
print("   (includes channels: 4)")

In [None]:
# ============================================================
# CELL 6: Custom Trainer & Validator with 4-Channel Support (FIXED)
# ============================================================
from ultralytics.models.yolo.detect import DetectionTrainer, DetectionValidator
from ultralytics.nn.autobackend import AutoBackend

def convert_conv_to_4ch(conv_layer):
    """Convert a conv layer from 3ch to 4ch input"""
    if conv_layer.in_channels == 4:
        return conv_layer
    
    new_conv = nn.Conv2d(
        in_channels=4,
        out_channels=conv_layer.out_channels,
        kernel_size=conv_layer.kernel_size,
        stride=conv_layer.stride,
        padding=conv_layer.padding,
        bias=conv_layer.bias is not None
    )
    
    with torch.no_grad():
        new_conv.weight[:, :3, :, :] = conv_layer.weight.clone()
        new_conv.weight[:, 3:4, :, :] = conv_layer.weight.mean(dim=1, keepdim=True)
        if conv_layer.bias is not None:
            new_conv.bias = nn.Parameter(conv_layer.bias.clone())
    
    return new_conv


def ensure_model_4ch(model):
    """Ensure model has 4-channel input, works with different model types"""
    try:
        # For AutoBackend wrapped models
        if hasattr(model, 'model') and hasattr(model.model, 'model'):
            first_conv = model.model.model[0].conv
            if first_conv.in_channels == 3:
                print("[4ch] Converting AutoBackend model...")
                model.model.model[0].conv = convert_conv_to_4ch(first_conv)
                return True
        # For direct DetectionModel
        elif hasattr(model, 'model') and hasattr(model.model[0], 'conv'):
            first_conv = model.model[0].conv
            if first_conv.in_channels == 3:
                print("[4ch] Converting DetectionModel...")
                model.model[0].conv = convert_conv_to_4ch(first_conv)
                return True
    except Exception as e:
        print(f"[4ch] Warning: {e}")
    return False


class RGBD4ChValidator(DetectionValidator):
    """Validator yang convert model ke 4ch sebelum validasi"""
    
    def __call__(self, trainer=None, model=None):
        """Override untuk convert model ke 4ch"""
        # Call parent first to setup everything
        # Model akan di-load dari path oleh parent class
        return super().__call__(trainer, model)
    
    def setup_model(self):
        """Override: setelah model di-load, convert ke 4ch"""
        super().setup_model()
        
        # self.model sudah di-set oleh parent
        if self.model is not None:
            ensure_model_4ch(self.model)
            print(f"[Validator] Model 4ch ready")


class RGBD4ChTrainer(DetectionTrainer):
    """Trainer yang convert model ke 4-channel"""
    
    def setup_model(self):
        """Setup model lalu convert ke 4-channel"""
        super().setup_model()
        
        first_conv = self.model.model[0].conv
        
        if first_conv.in_channels == 4:
            print("[Trainer] Model sudah 4-channel")
            return
        
        print(f"[Trainer] Converting to 4-channel...")
        self.model.model[0].conv = convert_conv_to_4ch(first_conv)
        print(f"[Trainer] ✅ Converted! Shape: {self.model.model[0].conv.weight.shape}")
    
    def get_validator(self):
        """Return custom validator"""
        self.loss_names = "box_loss", "cls_loss", "dfl_loss"
        return RGBD4ChValidator(
            self.test_loader, 
            save_dir=self.save_dir, 
            args=self.args,
            _callbacks=self.callbacks
        )

print("✅ Custom Trainer & Validator defined (FIXED)")

In [None]:
# ============================================================
# CELL 7: Training Configuration
# ============================================================
EXP_PREFIX = "exp_a3_rgbd"
SEEDS = [42, 123, 456, 789, 101]
EPOCHS = 100
IMGSZ = 640
BATCH_SIZE = 16
DEVICE = 0 if torch.cuda.is_available() else 'cpu'

print(f"Seeds: {SEEDS}")
print(f"Epochs: {EPOCHS}")
print(f"Batch: {BATCH_SIZE}")
print(f"Device: {DEVICE}")

In [None]:
# ============================================================
# CELL 8: Training Loop
# ============================================================
results_all = {}

for seed in SEEDS:
    print(f"\n{'='*60}")
    print(f"TRAINING - Seed {seed} ({SEEDS.index(seed)+1}/{len(SEEDS)})")
    print(f"{'='*60}\n")
    
    trainer = RGBD4ChTrainer(overrides={
        'model': 'yolo11n.pt',
        'data': str(config_path),
        'imgsz': IMGSZ,
        'epochs': EPOCHS,
        'batch': BATCH_SIZE,
        'device': DEVICE,
        'seed': seed,
        'name': f"{EXP_PREFIX}_seed{seed}",
        'project': str(RUNS_PATH),
        'exist_ok': True,
        'pretrained': True,
        'patience': 30,
        'val': True,
        'hsv_h': 0.0,
        'hsv_s': 0.0,
        'hsv_v': 0.0,
        'degrees': 0.0,
        'translate': 0.1,
        'scale': 0.5,
        'fliplr': 0.5,
        'mosaic': 1.0,
        'mixup': 0.0,
        'copy_paste': 0.0,
        'erasing': 0.0,
    })
    
    trainer.train()
    
    results_all[seed] = {
        'model_path': str(RUNS_PATH / f"{EXP_PREFIX}_seed{seed}" / "weights" / "best.pt"),
    }
    
    print(f"\n✅ Seed {seed} completed!")
    
    del trainer
    torch.cuda.empty_cache()

print("\n" + "="*60)
print("ALL TRAINING COMPLETED!")
print("="*60)

In [None]:
# ============================================================
# CELL 9: Evaluation dengan Custom Validator
# ============================================================
results_dict = {}

print("\n" + "="*60)
print("EVALUATION ON TEST SET")
print("="*60)

for seed in SEEDS:
    model_path = RUNS_PATH / f"{EXP_PREFIX}_seed{seed}" / "weights" / "best.pt"
    
    if not model_path.exists():
        print(f"Model not found: {model_path}")
        continue
    
    print(f"\nSeed {seed}:")
    
    model = YOLO(str(model_path))
    
    # Convert ke 4ch jika perlu
    first_conv = model.model.model[0].conv
    if first_conv.in_channels == 3:
        print("  Converting to 4ch...")
        model.model.model[0].conv = convert_conv_to_4ch(first_conv)
    
    metrics = model.val(
        data=str(config_path),
        split="test",
        device=DEVICE,
        name=f"test_{EXP_PREFIX}_seed{seed}",
        exist_ok=True,
    )
    
    results_dict[seed] = {
        'mAP50': metrics.box.map50,
        'mAP50-95': metrics.box.map,
        'Precision': metrics.box.mp,
        'Recall': metrics.box.mr
    }
    
    print(f"  mAP50: {metrics.box.map50:.3f}, mAP50-95: {metrics.box.map:.3f}")
    
    del model
    torch.cuda.empty_cache()

In [None]:
# ============================================================
# CELL 10: Summary & Save
# ============================================================
if results_dict:
    df = pd.DataFrame(results_dict).T
    df.index.name = 'Seed'
    avg = df.mean()
    std = df.std()

    print("\n" + "="*60)
    print("A.3 RGB+DEPTH - FINAL RESULTS")
    print("="*60 + "\n")
    print(df.to_string(float_format=lambda x: f"{x:.3f}"))

    print("\n" + "-"*60)
    print("SUMMARY (Mean ± Std)")
    print("-"*60)
    for col in df.columns:
        print(f"  {col}: {avg[col]:.3f} ± {std[col]:.3f}")

    output_file = KAGGLE_OUTPUT / 'a3_rgbd_results.txt'
    with open(output_file, 'w') as f:
        f.write("A.3 RGB+Depth Results\n")
        f.write(f"Generated: {datetime.now()}\n\n")
        f.write(df.to_string(float_format=lambda x: f"{x:.3f}"))
        f.write("\n\nSummary:\n")
        for col in df.columns:
            f.write(f"  {col}: {avg[col]:.3f} ± {std[col]:.3f}\n")
    print(f"\n✅ Saved: {output_file}")

if RUNS_PATH.exists():
    shutil.make_archive(str(BASE_PATH / 'a3_runs'), 'zip', RUNS_PATH)
    print("✅ a3_runs.zip created")