In [None]:
# train_strokevit_r50_noarg.py
# -*- coding: utf-8 -*-
import os, math, random, json
from dataclasses import dataclass
from copy import deepcopy
from typing import Tuple

import numpy as np
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import datasets, transforms
from torch.cuda.amp import autocast, GradScaler

from sklearn.metrics import f1_score, accuracy_score, classification_report, confusion_matrix

from model_strokevit_r50 import StrokeViT_R50

# =========================
# Konfig (argparse YOK)
# =========================
@dataclass
class CFG:
    train_dir: str = "/home/comp5/ARTEK/SYZ_25/ILK_GOREV/VERI/ilk_gorev_veri_split/train"
    val_dir:   str = "/home/comp5/ARTEK/SYZ_25/ILK_GOREV/VERI/ilk_gorev_veri_split/val"
    save_path: str = "outputs_strokevit_r50_best/strokevit_r50_best.pth"

    img_size:  int = 384
    epochs:    int = 30
    batch_size:int = 16
    num_workers:int = 4

    # Optimizasyon
    lr: float = 3e-4
    weight_decay: float = 0.05
    min_lr: float = 1e-6
    warmup_ratio: float = 0.1
    grad_clip: float = 1.0
    use_ema: bool = True
    ema_decay: float = 0.999

    # Loss/imbalance
    loss_type: str = "lsce"     # "lsce" | "focal" | "bce"
    label_smoothing: float = 0.1
    focal_gamma: float = 2.0
    use_weighted_sampler: bool = True
    force_binary_bce: bool = False  # True ise 2 sınıfta bile BCE (num_classes=1)

    # Early stop
    patience: int = 7

    # Seed
    seed: int = 42

CFG = CFG()
os.makedirs(os.path.dirname(CFG.save_path) or ".", exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# =========================
# Yardımcılar
# =========================
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

def set_seed(s: int):
    random.seed(s); np.random.seed(s)
    torch.manual_seed(s); torch.cuda.manual_seed_all(s)

class EMA:
    def __init__(self, model: nn.Module, decay: float = 0.999):
        self.decay = decay
        self.ema = deepcopy(model).eval()
        for p in self.ema.parameters(): p.requires_grad_(False)
    def to(self, dev): self.ema.to(dev); return self
    @torch.no_grad()
    def update(self, model: nn.Module):
        d = self.decay
        for ep, p in zip(self.ema.parameters(), model.parameters()):
            ep.mul_(d).add_(p, alpha=1.0 - d)
        for eb, b in zip(self.ema.buffers(), model.buffers()):
            eb.copy_(b)

class WarmupCosine:
    def __init__(self, optimizer, warmup_steps, max_steps, min_lr=1e-6):
        self.opt = optimizer
        self.ws  = max(1, warmup_steps)
        self.ms  = max_steps
        self.min_lr = min_lr
        self.step_n = 0
        self.base = [g["lr"] for g in optimizer.param_groups]
    def step(self):
        self.step_n += 1
        for i, g in enumerate(self.opt.param_groups):
            base_lr = self.base[i]
            if self.step_n <= self.ws:
                lr = base_lr * self.step_n / self.ws
            else:
                prog = (self.step_n - self.ws) / max(1, self.ms - self.ws)
                lr = self.min_lr + 0.5*(base_lr - self.min_lr)*(1 + math.cos(math.pi * prog))
            g["lr"] = lr

def build_transforms(img_size: int, train: bool):
    ops = [transforms.Grayscale(num_output_channels=3)]
    if train:
        ops += [
            transforms.RandomResizedCrop(img_size, scale=(0.8, 1.0), ratio=(0.9, 1.1)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.1, contrast=0.1),
        ]
    else:
        ops += [transforms.Resize((img_size, img_size))]
    ops += [transforms.ToTensor(), transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD)]
    return transforms.Compose(ops)

def make_loaders() -> Tuple[DataLoader, DataLoader, int, np.ndarray]:
    train_tf = build_transforms(CFG.img_size, True)
    val_tf   = build_transforms(CFG.img_size, False)

    train_ds = datasets.ImageFolder(CFG.train_dir, transform=train_tf)
    val_ds   = datasets.ImageFolder(CFG.val_dir, transform=val_tf)

    num_classes = len(train_ds.classes)
    targets = np.array(train_ds.targets)

    if CFG.use_weighted_sampler:
        counts = np.bincount(targets, minlength=num_classes)
        class_w = 1.0 / (counts + 1e-6)
        sample_w = class_w[targets]
        sampler = WeightedRandomSampler(sample_w.tolist(), num_samples=len(sample_w), replacement=True)
        train_ld = DataLoader(train_ds, batch_size=CFG.batch_size, sampler=sampler,
                              num_workers=CFG.num_workers, pin_memory=True, drop_last=True)
    else:
        train_ld = DataLoader(train_ds, batch_size=CFG.batch_size, shuffle=True,
                              num_workers=CFG.num_workers, pin_memory=True, drop_last=True)

    val_ld = DataLoader(val_ds, batch_size=CFG.batch_size, shuffle=False,
                        num_workers=CFG.num_workers, pin_memory=True)

    return train_ld, val_ld, num_classes, targets

# =========================
# Loss fonksiyonları
# =========================
class LabelSmoothingCE(nn.Module):
    def __init__(self, eps: float = 0.1):
        super().__init__()
        self.eps = eps
    def forward(self, logits, targets):
        C = logits.size(-1)
        logp = F.log_softmax(logits, dim=-1)
        with torch.no_grad():
            true = torch.zeros_like(logp).fill_(self.eps / C)
            true.scatter_(1, targets.unsqueeze(1), 1.0 - self.eps + (self.eps / C))
        return torch.mean(torch.sum(-true * logp, dim=-1))

class FocalCrossEntropy(nn.Module):
    def __init__(self, alpha=None, gamma: float = 2.0, reduction="mean"):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction
    def forward(self, logits, targets):
        logp = F.log_softmax(logits, dim=1)
        p = torch.exp(logp)
        ce = F.nll_loss(logp, targets, reduction="none")
        pt = p[torch.arange(logits.size(0), device=logits.device), targets]
        loss = (1 - pt) ** self.gamma * ce
        if self.alpha is not None:
            if isinstance(self.alpha, (list, tuple)):
                a = torch.tensor(self.alpha, device=logits.device, dtype=logits.dtype)[targets]
                loss = a * loss
            else:
                loss = self.alpha * loss
        return loss.mean() if self.reduction == "mean" else loss.sum()

class BinaryFocal(nn.Module):
    def __init__(self, gamma=2.0, alpha=None):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
    def forward(self, logits, targets):
        # logits (B,1), targets (B,)
        probs = torch.sigmoid(logits).squeeze(1)
        targets = targets.float()
        ce = F.binary_cross_entropy(probs, targets, reduction="none")
        pt = torch.where(targets == 1, probs, 1 - probs)
        loss = (1 - pt) ** self.gamma * ce
        if self.alpha is not None:
            loss = torch.where(targets == 1, self.alpha * loss, (1 - self.alpha) * loss)
        return loss.mean()

# =========================
# Değerlendirme
# =========================
def evaluate(model: nn.Module, loader: DataLoader, num_classes: int, optimize_thr: bool):
    model.eval()
    y_true, y_pred = [], []
    y_prob_bin = []  # binary için olasılık
    with torch.no_grad():
        for imgs, labels in loader:
            imgs = imgs.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            logits = model(imgs)
            if num_classes == 1:
                probs = torch.sigmoid(logits).squeeze(1)
                y_prob_bin.append(probs.cpu().numpy())
                # threshold daha sonra optimize edilecek
            else:
                preds = torch.argmax(logits, dim=1)
                y_pred.append(preds.cpu().numpy())
            y_true.append(labels.cpu().numpy())

    y_true = np.concatenate(y_true)
    if num_classes == 1:
        y_prob = np.concatenate(y_prob_bin)
        # eşik optimizasyonu (maks F1)
        if optimize_thr:
            ths = np.linspace(0, 1, 1001)
            best_f1, best_th = -1, 0.5
            for t in ths:
                preds = (y_prob >= t).astype(int)
                f1 = f1_score(y_true, preds)
                if f1 > best_f1:
                    best_f1, best_th = f1, t
            y_pred = (y_prob >= best_th).astype(int)
            acc = accuracy_score(y_true, y_pred)
            rep = classification_report(y_true, y_pred, digits=4, target_names=["neg","pos"])
            cm  = confusion_matrix(y_true, y_pred)
            return acc, best_f1, y_true, y_pred, dict(threshold=best_th, report=rep, cm=cm.tolist())
        else:
            preds = (y_prob >= 0.5).astype(int)
            f1 = f1_score(y_true, preds)
            acc = accuracy_score(y_true, preds)
            rep = classification_report(y_true, preds, digits=4, target_names=["neg","pos"])
            cm  = confusion_matrix(y_true, preds)
            return acc, f1, y_true, preds, dict(threshold=0.5, report=rep, cm=cm.tolist())
    else:
        y_pred = np.concatenate(y_pred)
        f1 = f1_score(y_true, y_pred, average="macro")
        acc = accuracy_score(y_true, y_pred)
        rep = classification_report(y_true, y_pred, digits=4)
        cm  = confusion_matrix(y_true, y_pred)
        return acc, f1, y_true, y_pred, dict(report=rep, cm=cm.tolist())

# =========================
# Eğitim
# =========================
def train():
    set_seed(CFG.seed)

    # Data
    train_ld, val_ld, ds_num_classes, train_targets = make_loaders()

    # Num classes belirle (binary BCE opsiyonu)
    if ds_num_classes == 2 and CFG.force_binary_bce:
        num_classes = 1
    else:
        num_classes = ds_num_classes

    # Model
    model = StrokeViT_R50(num_classes=num_classes, img_size=CFG.img_size).to(device)

    # EMA
    ema = EMA(model, decay=CFG.ema_decay).to(device) if CFG.use_ema else None

    # Optimizer + Scheduler
    optimizer = optim.AdamW(model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
    total_steps = CFG.epochs * len(train_ld)
    warmup_steps = int(CFG.warmup_ratio * total_steps)
    scheduler = WarmupCosine(optimizer, warmup_steps=warmup_steps, max_steps=total_steps, min_lr=CFG.min_lr)

    # Loss seçimi
    if num_classes == 1 or CFG.loss_type == "bce":
        # Binary BCE / Focal-BCE
        if CFG.loss_type == "bce":
            # pos_weight = neg/pos
            neg = (train_targets == 0).sum()
            pos = (train_targets == 1).sum()
            pos_weight = torch.tensor([neg / max(1, pos)], device=device, dtype=torch.float32)
            criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
        else:
            criterion = BinaryFocal(gamma=CFG.focal_gamma, alpha=0.5)
    else:
        # Multi-class CE ailesi
        if CFG.loss_type == "focal":
            criterion = FocalCrossEntropy(gamma=CFG.focal_gamma)
        else:
            criterion = LabelSmoothingCE(eps=CFG.label_smoothing)

    scaler = GradScaler()

    best_f1 = -1.0
    best_payload = None
    endure = 0

    for epoch in range(1, CFG.epochs + 1):
        model.train()
        run_loss = 0.0

        for imgs, labels in train_ld:
            imgs = imgs.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)
            with autocast():
                logits = model(imgs)
                if num_classes == 1 or CFG.loss_type == "bce":
                    loss = criterion(logits.squeeze(1), labels)
                else:
                    loss = criterion(logits, labels)

            scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=CFG.grad_clip)
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()

            if ema: ema.update(model)
            run_loss += loss.item()

        # ---- Validation ----
        eval_model = ema.ema if ema else model
        acc, f1, y_true, y_pred, extra = evaluate(
            eval_model, val_ld, num_classes=num_classes, optimize_thr=(num_classes==1)
        )

        print(f"Epoch {epoch:02d} | TrainLoss {run_loss/len(train_ld):.4f} | "
              f"ValAcc {acc:.4f} | ValF1 {f1:.4f}")
        if "report" in extra:
            print(extra["report"])

        # En iyiyi kaydet
        if f1 > best_f1:
            best_f1 = f1
            payload = {
                "epoch": epoch,
                "model": eval_model.state_dict(),
                "num_classes": num_classes,
                "cfg": CFG.__dict__,
                "val_f1": float(f1),
            }
            if num_classes == 1 and "threshold" in extra:
                payload["best_threshold"] = float(extra["threshold"])
            torch.save(payload, CFG.save_path)
            best_payload = payload
            print(f"✅ Best saved -> {CFG.save_path} (F1={best_f1:.4f})")
            endure = 0
        else:
            endure += 1
            if endure >= CFG.patience:
                print("⏹ Early stopping tetiklendi.")
                break

    print("Training done.")
    if best_payload is not None:
        meta_path = os.path.splitext(CFG.save_path)[0] + "_summary.json"
        with open(meta_path, "w") as f:
            json.dump({
                "epoch": best_payload["epoch"],
                "val_f1": best_payload["val_f1"],
                "num_classes": best_payload["num_classes"],
                "best_threshold": best_payload.get("best_threshold", 0.5),
                "cfg": best_payload["cfg"]
            }, f, indent=2)
        print(f"✓ Özet yazıldı: {meta_path}")

if __name__ == "__main__":
    train()


In [3]:
# test_strokevit_r50_noarg.py
# -*- coding: utf-8 -*-
import os, json
from dataclasses import dataclass
import numpy as np
from typing import Tuple

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.cuda.amp import autocast
from PIL import Image

from sklearn.metrics import (
    accuracy_score, f1_score, confusion_matrix, classification_report
)

from model_strokevit_r50 import StrokeViT_R50

# ============== Konfig ==============
@dataclass
class CFG:
    test_dir:  str = "/home/comp5/ARTEK/SYZ_25/SYZ_25_Egitim/genel_veri/2025_Veri/test_veri_seti/ÜNİVERSİTEVEÜZERİ_EGİTİM 2_PNG/BT_TestSet/output_crop_gamma/test"
    ckpt_path: str = "/home/comp5/ARTEK/SYZ_25/SYZ_25_Egitim/ILK_GOREV/nihai_modeller/resnet50_strokevit+/outputs_strokevit_r50_best/strokevit_r50_best.pth"
    out_dir:   str = "test_r50_outputs_2gfgfgfrtyhfdryhfg"

    img_size:   int = 384
    batch_size: int = 32
    num_workers:int = 4

    use_saved_threshold: bool = True    # binary ise summary/ckpt içindeki eşiği kullan
    tta_hflip: bool = True              # TTA: yatay ayna ortalaması
    save_csv: bool = True

CFG = CFG()
os.makedirs(CFG.out_dir, exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

# ============== Data ==============
class ToRGB:
    def __call__(self, img: Image.Image):
        return img.convert("RGB") if img.mode != "RGB" else img

def make_test_loader() -> Tuple[datasets.ImageFolder, DataLoader]:
    tfm = transforms.Compose([
        ToRGB(),
        transforms.Resize((CFG.img_size, CFG.img_size)),
        transforms.ToTensor(),
        transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
    ])
    ds = datasets.ImageFolder(CFG.test_dir, transform=tfm)
    ld = DataLoader(ds, batch_size=CFG.batch_size, shuffle=False,
                    num_workers=CFG.num_workers, pin_memory=True)
    return ds, ld

# ============== Yardımcılar ==============
def load_model_and_meta():
    # ckpt: {"epoch", "model", "num_classes", "cfg", "val_f1", (opt) "best_threshold"}
    payload = torch.load(CFG.ckpt_path, map_location="cpu")
    num_classes = payload.get("num_classes", None)
    if num_classes is None:
        # Eski kayıtlarda yoksa dataset'ten alacağız
        print("[warn] num_classes checkpoint'te bulunamadı; dataset'e göre belirlenecek.")
    model = StrokeViT_R50(num_classes=num_classes if num_classes else 3, img_size=CFG.img_size)
    state = payload["model"] if "model" in payload else payload
    model.load_state_dict(state, strict=True)
    model.to(device).eval()

    # threshold okuma (binary ise)
    best_th = payload.get("best_threshold", None)
    # summary.json varsa tercih edebilirsin
    sum_path = os.path.splitext(CFG.ckpt_path)[0] + "_summary.json"
    if CFG.use_saved_threshold and os.path.exists(sum_path):
        try:
            with open(sum_path, "r") as f:
                j = json.load(f)
            best_th = float(j.get("best_threshold", best_th if best_th is not None else 0.5))
        except Exception:
            pass
    return model, num_classes, best_th

def horizontal_flip_batch(x: torch.Tensor) -> torch.Tensor:
    return torch.flip(x, dims=[3])

# ============== Test / İnferans ==============
def run_test():
    ds, ld = make_test_loader()
    model, num_classes_ckpt, best_th = load_model_and_meta()

    # Eğer ckpt'te num_classes yoksa dataset'ten belirle
    num_classes = num_classes_ckpt if num_classes_ckpt is not None else len(ds.classes)
    if num_classes == 2:
        # iki sınıflı ama BCE ile eğitilmiş olabilir; ckpt'te num_classes=1 ise zaten yukarıda gelirdi
        # burada 2 sınıfı çok-sınıf olarak değerlendiriyoruz (softmax). Binary eşiği sadece ckpt num_classes==1 iken gerekli.
        pass

    all_true, all_pred = [], []
    all_prob_bin = []     # binary için pozitif olasılık
    all_top1prob = []     # çok-sınıf için top-1 olasılık
    all_paths = []

    with torch.no_grad(), autocast():
        for imgs, labels in ld:
            imgs = imgs.to(device, non_blocking=True)
            logits = None
            if CFG.tta_hflip:
                imgs_flip = horizontal_flip_batch(imgs)
                logits = model(imgs)
                logits_flip = model(imgs_flip)
                logits = 0.5 * (logits + logits_flip)
            else:
                logits = model(imgs)

            if num_classes == 1:
                probs = torch.sigmoid(logits).squeeze(1)  # (B,)
                all_prob_bin.append(probs.cpu().numpy())
            else:
                probs = torch.softmax(logits, dim=1)       # (B,C)
                top1 = probs.max(dim=1).values
                preds = probs.argmax(dim=1)
                all_pred.append(preds.cpu().numpy())
                all_top1prob.append(top1.cpu().numpy())

            all_true.append(labels.numpy())
            # ImageFolder sample paths
            # Mini-trick: DataLoader batch sırası DataSet.samples sırasını takip eder
            # Yine de güvenli olmak için her iterasyonda batch sayısı kadar path topla
            start = len(all_paths)
            for _ in range(labels.size(0)):
                # sıralı ilerlediğimizi varsayalım
                pass
        # Daha güvenlisi: doğrudan dataset.samples'tan topla:
    all_paths = [p for (p, _) in ds.samples]

    y_true = np.concatenate(all_true)

    out_json = {
        "classes": ds.classes,
        "num_classes": int(num_classes),
        "used_threshold": None,
        "metrics": {},
        "confusion_matrix": None,
    }

    # --- Tahmin ve metrikler ---
    if num_classes == 1:
        y_prob = np.concatenate(all_prob_bin)
        # Eşik seçimi:
        if CFG.use_saved_threshold and best_th is not None:
            th = float(best_th)
        else:
            # test set üzerinde maksimum macro-F1 eşiği (isteğe bağlı)
            ths = np.linspace(0, 1, 1001)
            best_f1, th = -1, 0.5
            for t in ths:
                y_pred_tmp = (y_prob >= t).astype(int)
                macro_f1_tmp = f1_score(y_true, y_pred_tmp, average="macro")
                if macro_f1_tmp > best_f1:
                    best_f1, th = macro_f1_tmp, t
        y_pred = (y_prob >= th).astype(int)

        acc = accuracy_score(y_true, y_pred)
        macro_f1 = f1_score(y_true, y_pred, average="macro")
        bin_f1 = f1_score(y_true, y_pred)  # pozitif sınıf için F1
        cm = confusion_matrix(y_true, y_pred).tolist()
        rep = classification_report(y_true, y_pred, digits=4, target_names=(ds.classes if len(ds.classes)==2 else ["neg","pos"]))

        out_json["used_threshold"] = float(th)
        out_json["metrics"] = {
            "accuracy": float(acc),
            "macro_f1": float(macro_f1),
            "binary_f1": float(bin_f1),
            "report": rep,
        }
        out_json["confusion_matrix"] = cm

        # CSV
        if CFG.save_csv:
            import csv
            csv_path = os.path.join(CFG.out_dir, "test_predictions_binary.csv")
            with open(csv_path, "w", newline="") as f:
                w = csv.writer(f)
                w.writerow(["path","true_idx","true_name","prob_pos","pred_idx","pred_name","threshold"])
                for (path, t), pr, pd in zip(ds.samples, y_prob, y_pred):
                    tname = ds.classes[t] if t < len(ds.classes) else str(t)
                    pname = ds.classes[pd] if pd < len(ds.classes) else str(pd)
                    w.writerow([path, int(t), tname, f"{pr:.6f}", int(pd), pname, f"{th:.3f}"])
            print(f"✓ CSV yazıldı: {csv_path}")

    else:
        y_pred = np.concatenate(all_pred)
        acc = accuracy_score(y_true, y_pred)
        macro_f1 = f1_score(y_true, y_pred, average="macro")
        cm = confusion_matrix(y_true, y_pred).tolist()
        rep = classification_report(y_true, y_pred, digits=4, target_names=ds.classes)

        out_json["metrics"] = {
            "accuracy": float(acc),
            "macro_f1": float(macro_f1),
            "report": rep,
        }
        out_json["confusion_matrix"] = cm

        # CSV
        if CFG.save_csv:
            import csv
            top1 = np.concatenate(all_top1prob)
            csv_path = os.path.join(CFG.out_dir, "test_predictions_multiclass.csv")
            with open(csv_path, "w", newline="") as f:
                w = csv.writer(f)
                w.writerow(["path","true_idx","true_name","pred_idx","pred_name","top1_prob"])
                for (path, t), pd, p1 in zip(ds.samples, y_pred, top1):
                    w.writerow([path, int(t), ds.classes[t], int(pd), ds.classes[pd], f"{p1:.6f}"])
            print(f"✓ CSV yazıldı: {csv_path}")

    # JSON sonuç
    json_path = os.path.join(CFG.out_dir, "test_summary.json")
    with open(json_path, "w") as f:
        json.dump(out_json, f, indent=2)
    print(f"✓ JSON yazıldı: {json_path}")

if __name__ == "__main__":
    run_test()




✓ CSV yazıldı: test_r50_outputs_2gfgfgfrtyhfdryhfg/test_predictions_multiclass.csv
✓ JSON yazıldı: test_r50_outputs_2gfgfgfrtyhfdryhfg/test_summary.json
