# Classification

# Import


In [None]:
import os
import json
import random
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split, WeightedRandomSampler
import torchvision.transforms as T

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import OneCycleLR

In [None]:
from google.colab import drive
drive.mount('/content/drive')
os.chdir("/content/drive/MyDrive/Colab Notebooks/project")
os.getcwd()

# Dataset

In [None]:
class CocoChocolateDataset(Dataset):
    def __init__(self,
                 img_dir: str,
                 coco_json: str,

                 class_names: list,
                 img_size=(512,512),
                 mode: str = 'train'):
        coco = json.load(open(coco_json))
        self.img_dir = img_dir
        self.id2fn = {im['id']: im['file_name'] for im in coco['images']}
        self.samples = []
        for ann in coco['annotations']:
            img_id = ann['image_id']
            x, y, w, h = map(int, ann['bbox'])
            cls = ann['category_id'] - 1
            self.samples.append((img_id, (x, y, w, h), cls))

        self.class_names = class_names
        self.img_size = img_size
        self.mode = mode
        self._setup_transforms()

    def _setup_transforms(self):
        if self.mode == 'train':
            self.tf = T.Compose([
                T.Resize(self.img_size),
                T.RandomHorizontalFlip(),
                T.RandomResizedCrop(self.img_size, scale=(0.6,1.0), ratio=(0.75,1.33)),
                T.RandomRotation(15),
                T.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
                T.RandomAffine(degrees=0, translate=(0.1,0.1), scale=(0.8,1.2), shear=10),
                T.ToTensor(),
                T.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225]),
                T.RandomErasing(p=0.5, scale=(0.02,0.2), ratio=(0.3,3.3))
            ])
        else:
            self.tf = T.Compose([
                T.Resize(self.img_size),
                T.ToTensor(),
                T.Normalize([0.485,0.456,0.406], [0.229,0.224,0.225])
            ])

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_id, (x, y, w, h), cls = self.samples[idx]
        fn = self.id2fn[img_id]
        img = Image.open(os.path.join(self.img_dir, fn)).convert('RGB')
        patch = img.crop((x, y, x+w, y+h))
        x_tensor = self.tf(patch)
        y_label = torch.tensor(cls, dtype=torch.long)
        return x_tensor, y_label, fn, (x, y, w, h)

class RepeatDataset(Dataset):
    """Repeats a base dataset x times with augmentations"""
    def __init__(self, ds: Dataset, times: int):
        self.ds    = ds
        self.times = times

    def __len__(self):
        return len(self.ds) * self.times

    def __getitem__(self, idx):
        return self.ds[idx % len(self.ds)]

# Model

In [None]:
class ChocolateNet(nn.Module):
    def __init__(self, num_classes=13):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3,64,3,padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64,128,3,padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128,256,3,padding=1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(256,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(512,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.AdaptiveAvgPool2d((1,1))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512,512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512,num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)

In [None]:
model=ChocolateNet()
total_params = sum(p.numel() for p in model.parameters())
total_params

# Train & Eval

In [None]:
def train_and_evaluate_(img_dir, coco_json, class_names,
                       epochs=20, batch_size=32, lr=1e-3,
                       val_split=0.2, device='cuda', repeat_times=5):
    # full dataset
    base_ds = CocoChocolateDataset(img_dir, coco_json, class_names, mode='train')
    full_ds = RepeatDataset(base_ds, times=repeat_times)

    n_val = int(len(full_ds) * val_split)
    n_train = len(full_ds) - n_val
    train_ds, val_ds = random_split(full_ds, [n_train, n_val])
    print(len(val_ds),len(train_ds))
    return train_ds, val_ds


IMG_DIR    = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/train_annotated/train"
JSON_PATH  = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/train_annotated/train/_annotations.coco.json"
CLASS_NAMES = [f"Type{i}" for i in range(1,14)]
device = "cuda" if torch.cuda.is_available() else "cpu"

train_ds, val_ds = train_and_evaluate_(
    IMG_DIR, JSON_PATH, CLASS_NAMES,
    epochs=15, batch_size=32, lr=1e-3,
    val_split=0.2, device=device,repeat_times=4
)

import random
import matplotlib.pyplot as plt
import torchvision.transforms as T

# 1) Helper to un-normalize and show a tensor image
inv_normalize = T.Normalize(
    mean=[-m/s for m, s in zip([0.485,0.456,0.406],[0.229,0.224,0.225])],
    std=[1/s for s in [0.229,0.224,0.225]]
)

def imshow_tensor(img_tensor, ax):
    """Undo normalization and plot."""
    img = inv_normalize(img_tensor).clamp(0,1)  # [C,H,W] in [0,1]
    npimg = img.permute(1,2,0).cpu().numpy()
    ax.imshow(npimg)
    ax.axis('off')

# 2) Function to plot random samples from a Subset
def plot_subset(ds, name, class_names, n=4):
    fig, axes = plt.subplots(1, n, figsize=(n*3, 3))
    fig.suptitle(f"{name} set samples", fontsize=16)
    for ax in axes:
        idx = random.randrange(len(ds))
        img_tensor, label, fn, bbox = ds[idx]
        imshow_tensor(img_tensor, ax)
        ax.set_title(f"{class_names[label]}\n{fn}\n{bbox}", fontsize=8)
    plt.tight_layout(rect=[0, 0, 1, 0.9])
    plt.show()

# 3) After you’ve created train_ds and val_ds:
plot_subset(train_ds, "Train", CLASS_NAMES, n=4)
plot_subset(val_ds,   "Val",   CLASS_NAMES, n=4)


In [None]:
def mixup_data(x, y, alpha=0.4):
    lam = np.random.beta(alpha, alpha)
    idx = torch.randperm(x.size(0))
    mixed_x = lam*x + (1-lam)*x[idx]
    y_a, y_b = y, y[idx]
    return mixed_x, y_a, y_b, lam

In [None]:
import os
import json
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split, WeightedRandomSampler
from torch.optim.lr_scheduler import OneCycleLR
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

import torchvision.transforms as T

# — your CocoChocolateDataset, RepeatDataset, ChocolateNet here —

def mixup_data(x, y, alpha=0.4):
    lam = np.random.beta(alpha, alpha)
    idx = torch.randperm(x.size(0))
    mixed_x = lam * x + (1 - lam) * x[idx]
    y_a, y_b = y, y[idx]
    return mixed_x, y_a, y_b, lam

def train(img_dir, coco_json, class_names,
          epochs=20, batch_size=32, lr=1e-3,
          val_split=0.2, device='cuda', repeat_times=4,
          resume_ckpt: str = None):

    # — dataset & loaders —
    base_ds = CocoChocolateDataset(img_dir, coco_json, class_names, mode='train')
    full_ds = RepeatDataset(base_ds, times=repeat_times)
    n_val   = int(len(full_ds) * val_split)
    n_train = len(full_ds) - n_val
    train_ds, val_ds = random_split(full_ds, [n_train, n_val])
    # switch val to eval transforms
    val_ds.dataset.ds.mode = 'val'
    val_ds.dataset.ds._setup_transforms()

    # class weights
    base_samples = train_ds.dataset.ds.samples
    all_train_labels = [base_samples[i % len(base_samples)][2] for i in train_ds.indices]
    counts = np.bincount(all_train_labels, minlength=len(class_names))
    weights = 1.0 / (counts + 1e-6)
    weights = weights / weights.sum() * len(class_names)
    weight_tensor = torch.tensor(weights, device=device, dtype=torch.float32)
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1, weight=weight_tensor)

    # sampler
    sample_weights = [weights[l] for l in all_train_labels]
    sampler = WeightedRandomSampler(sample_weights, num_samples=len(sample_weights), replacement=True)

    train_loader = DataLoader(train_ds, batch_size=batch_size, sampler=sampler, num_workers=2)
    val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=2)

    # model / optimizer / scheduler
    model     = ChocolateNet(len(class_names)).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = OneCycleLR(
        optimizer,
        max_lr=lr,
        steps_per_epoch=len(train_loader),
        epochs=epochs,
        pct_start=0.1,
        div_factor=10,
    )

    # optionally resume
    start_epoch = 1
    best_acc    = 0.0
    if resume_ckpt and os.path.isfile(resume_ckpt):
        ckpt = torch.load(resume_ckpt, map_location=device)
        model.load_state_dict(ckpt['model_state'])
        optimizer.load_state_dict(ckpt['optim_state'])
        scheduler.load_state_dict(ckpt['sched_state'])
        start_epoch = ckpt['epoch'] + 1
        best_acc    = ckpt['best_acc']
        print(f"Resuming at epoch {start_epoch}, best_acc={best_acc:.4f}")

    # — training loop —
    for epoch in range(start_epoch, epochs+1):
        model.train()
        running_loss = 0.0

        for imgs, labels, *_ in train_loader:
            # MixUp
            mixed, la, lb, lam = mixup_data(imgs, labels, alpha=0.4)
            mixed = mixed.to(device)
            la, lb = la.to(device), lb.to(device)

            # forward + loss
            out = model(mixed)
            loss = lam * criterion(out, la) + (1 - lam) * criterion(out, lb)

            # backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            scheduler.step()

            running_loss += loss.item() * mixed.size(0)

        train_loss = running_loss / len(train_loader.dataset)

        # — validation —
        model.eval()
        val_loss = 0.0
        y_true, y_pred = [], []
        with torch.no_grad():
            for imgs, labels, *_ in val_loader:
                imgs, labels = imgs.to(device), labels.to(device)
                out = model(imgs)
                val_loss += criterion(out, labels).item() * imgs.size(0)
                preds = out.argmax(1).cpu().numpy()
                y_pred.extend(preds)
                y_true.extend(labels.cpu().numpy())

        val_loss /= len(val_loader.dataset)
        val_acc   = accuracy_score(y_true, y_pred)

        print(f"[Epoch {epoch}/{epochs}] train_loss={train_loss:.4f}  val_loss={val_loss:.4f}  val_acc={val_acc:.4f}")

        # checkpoint best
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state': model.state_dict(),
                'optim_state': optimizer.state_dict(),
                'sched_state': scheduler.state_dict(),
                'best_acc': best_acc,
            }, f"best_checkpoint_epoch.pth")

    return model, val_loader

def eval_model(model, val_loader, class_names, device='cuda',weights="best_checkpoint_epoch.pth"):
    # load best
    ckpt = torch.load(weights, map_location=device)
    model.load_state_dict(ckpt['model_state'])
    model.to(device).eval()

    y_true, y_pred = [], []
    with torch.no_grad():
        for imgs, labels, *_ in val_loader:
            imgs = imgs.to(device)
            out  = model(imgs)
            y_pred.extend(out.argmax(1).cpu().numpy())
            y_true.extend(labels.numpy())

    print(classification_report(y_true, y_pred, target_names=class_names, zero_division=0))
    cm = confusion_matrix(y_true, y_pred)
    df_cm = pd.DataFrame(cm, index=class_names, columns=class_names)
    plt.figure(figsize=(8,8))
    plt.imshow(df_cm, cmap='Blues')
    plt.xticks(range(len(class_names)), class_names, rotation=45, ha='right')
    plt.yticks(range(len(class_names)), class_names)
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, cm[i,j], ha='center', va='center',
                     color='white' if cm[i,j]>cm.max()/2 else 'black')
    plt.tight_layout()
    plt.show()

    return df_cm

# Run

In [None]:
if __name__=="__main__":
    IMG_DIR   = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/train_annotated/train"
    JSON_PATH = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/train_annotated/train/_annotations_square.coco.json"
    CLASS_NAMES = [
        "Amandina","Arabia","Comtesse","Crème brulée","Jelly Black",
        "Jelly Milk","Jelly White","Noblesse","Noir authentique",
        "Passion au lait","Stracciatella","Tentation noir","Triangolo"
    ]
    device = "cuda" if torch.cuda.is_available() else "cpu"

    model, val_loader = train(
        IMG_DIR, JSON_PATH, CLASS_NAMES,
        epochs=15, batch_size=32, lr=1e-3,
        val_split=0.2, device=device,
        repeat_times=5,
        resume_ckpt="best_checkpoint_epoch_13.pth"
    )
    eval_model(model, val_loader, CLASS_NAMES, device=device,weights="best_checkpoint_epoch.pth")

In [None]:
eval_model(model, val_loader, CLASS_NAMES, device=device,weights="best_checkpoint_epoch_13.pth")

# Run on the test set

In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from skimage.measure import label, regionprops
from skimage.morphology import remove_small_objects
import torchvision.transforms as T
import torch.nn as nn
from torch.utils.data import Dataset

import os
import torch
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from skimage.measure    import label, regionprops
from skimage.morphology import remove_small_objects, binary_dilation, disk
from skimage.feature    import peak_local_max
from skimage.segmentation import watershed
from scipy import ndimage as ndi
import torchvision.transforms as T
import torch.nn as nn
from torch.utils.data import Dataset

class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(UNet, self).__init__()

        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_c),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_c),
                nn.ReLU(inplace=True)
            )

        self.enc1 = conv_block(in_channels, 64)
        self.enc2 = conv_block(64, 128)
        self.enc3 = conv_block(128, 256)

        self.pool = nn.MaxPool2d(2)

        self.bottleneck = conv_block(256, 512)

        self.up3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = conv_block(512, 256)

        self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = conv_block(256, 128)

        self.up1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = conv_block(128, 64)

        self.final = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))

        b = self.bottleneck(self.pool(e3))

        d3 = self.dec3(torch.cat([self.up3(b), e3], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1))
        d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1))

        return torch.sigmoid(self.final(d1))

class ChocolateNet(nn.Module):
    def __init__(self, num_classes=13):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3,64,3,padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64,128,3,padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128,256,3,padding=1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(256,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(512,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.AdaptiveAvgPool2d((1,1))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512,512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512,num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)

# 2) The Pipeline Dataset
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from skimage.measure    import label, regionprops
from skimage.morphology import remove_small_objects
import torchvision.transforms as T
import torch.nn as nn
from torch.utils.data import Dataset

class ChocolateInferenceDataset(Dataset):
    def __init__(self,
                 image_dir: str,
                 seg_model: nn.Module,
                 cls_model: nn.Module,
                 class_names: list,
                 device: torch.device,
                 seg_size=(256, 256),
                 cls_size=(512, 512),
                 seg_thresh=0.5,
                 min_blob_size=100):
        self.image_dir   = image_dir
        self.fnames      = sorted(os.listdir(image_dir))
        self.seg_model   = seg_model.to(device).eval()
        self.cls_model   = cls_model.to(device).eval()
        self.class_names = class_names
        self.device      = device

        # **Use seg_tf for U-Net** and include the same normalization you used in training
        self.seg_tf = T.Compose([
            T.Resize(seg_size),
            T.ToTensor(),
        ])
        # classifier transform remains as before
        self.cls_tf = T.Compose([
            T.Resize(cls_size),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],
                        [0.229,0.224,0.225])
        ])

        self.seg_thresh    = seg_thresh
        self.min_blob_size = min_blob_size

    def __len__(self):
        return len(self.fnames)

    def __getitem__(self, idx):
        fname = self.fnames[idx]
        img   = Image.open(os.path.join(self.image_dir, fname)).convert("RGB")
        W,H   = img.size

        # — 1) SEGMENTATION at low res —
        x_small = self.seg_tf(img).unsqueeze(0).to(self.device)
        with torch.no_grad():
            out_small = self.seg_model(x_small)[0,0].cpu().numpy()  # e.g. 256×256

        mask_small = out_small > self.seg_thresh
        mask_small = remove_small_objects(mask_small, min_size=self.min_blob_size)

        # — 2) Inflate tiny white blobs at low res —
        labels_small = label(mask_small)
        inflated_small = np.zeros_like(mask_small)
        for prop in regionprops(labels_small):
            region = (labels_small == prop.label)
            if prop.area < 300:            # tune threshold
                region = binary_dilation(region, disk(10))
            inflated_small |= region

        # — 3) Upsample to full size —
        mask_full = np.array(
            Image.fromarray((inflated_small*255).astype(np.uint8))
                 .resize((W,H), Image.NEAREST)
        ) > 0

        # — 4) Split touching blobs with watershed —
        # distance transform
        dist = ndi.distance_transform_edt(mask_full)
        coordinates = peak_local_max(
            dist,
            labels=mask_full,
            footprint=np.ones((40,40)),
            threshold_abs=25,
        )

        markers = np.zeros_like(mask_full, dtype=bool)
        if coordinates.shape[0] > 0:
          markers[tuple(coordinates.T)] = True
        markers = ndi.label(markers)[0]

        if markers.max() == 0:
             if mask_full.sum() > 0:
                 labels_ws = label(mask_full)
             else:
                 labels_ws = np.zeros_like(mask_full) # No objects found at all
        else:
             labels_ws = watershed(-dist, markers, mask=mask_full)


        # Optional debug
        fig, axes = plt.subplots(1,3,figsize=(12,4))
        axes[0].imshow(mask_full, cmap='gray'); axes[0].set_title("Upsampled Mask"); axes[0].axis('off')
        axes[1].imshow(dist, cmap='magma');      axes[1].set_title("Distance");      axes[1].axis('off')
        axes[2].imshow(labels_ws, cmap='tab20'); axes[2].set_title("Watershed");     axes[2].axis('off')
        plt.tight_layout(); plt.show()

        # — 5) Crop & classify each separate region —
        results = []
        # Iterate over unique labels in labels_ws (excluding background label 0)
        for obj_id in np.unique(labels_ws)[1:]:
            region = (labels_ws == obj_id)
            # Find the bounding box of this specific region
            minr, minc, maxr, maxc = regionprops(region.astype(int))[0].bbox
            patch = img.crop((minc, minr, maxc, maxr))
            x = self.cls_tf(patch).unsqueeze(0).to(self.device)
            with torch.no_grad():
                pred = self.cls_model(x).argmax(1).item()
            results.append({
                "bbox": (minc, minr, maxc-minc, maxr-minr),
                "pred": self.class_names[pred]
            })

        return img, fname, results


# 3) Usage + Visualization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_names = class_names = [
    "Amandina",
    "Arabia",
    "Comtesse",
    "Crème brulée",
    "Jelly Black",
    "Jelly Milk",
    "Jelly White",
    "Noblesse",
    "Noir authentique",
    "Passion au lait",
    "Stracciatella",
    "Tentation noir",
    "Triangolo",

]
# instantiate models and load weights
seg_model = UNet()
seg_model.load_state_dict(torch.load("Bon_mask_model4_epoch25.pth", map_location=device))
cls_model = ChocolateNet(len(class_names))
cls_model.load_state_dict(torch.load("best_checkpoint_epoch_13.pth", map_location=device)["model_state"])

# dataset
dataset = ChocolateInferenceDataset(
    image_dir="/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/test",
    seg_model=seg_model,
    cls_model=cls_model,
    class_names=class_names,
    device=device
)

# iterate & display
output_dir = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/test_results_v2"
os.makedirs(output_dir, exist_ok=True)
for img, fname, results in dataset:
    fig, ax = plt.subplots(1,1,figsize=(6,6))
    ax.imshow(img); ax.set_title(fname); ax.axis('off')
    dets = [r["pred"] for r in results]
    print(f"{fname} → {dets}")
    for r in results:
        x,y,w,h = r["bbox"]
        ax.add_patch(plt.Rectangle((x,y), w, h, fill=False, edgecolor='lime', lw=2))
        ax.text(x, y-5, r["pred"], color='white',
                bbox=dict(facecolor='black', alpha=0.5), fontsize=8)
    plt.show()
    save_path = os.path.join(output_dir, fname)
    fig.savefig(save_path, bbox_inches='tight', pad_inches=0)
    plt.close(fig)

# Make DataFrame & save
rows = []
for img, fname, results in dataset:
    image_id = os.path.splitext(fname)[0]
    counts = {cn: 0 for cn in class_names}
    for r in results:
        counts[r["pred"]] += 1
    row = {"id": image_id}
    row.update(counts)
    rows.append(row)

columns = [
    "id",
    "Jelly White",
    "Jelly Milk",
    "Jelly Black",
    "Amandina",
    "Crème brulée",
    "Triangolo",
    "Tentation noir",
    "Comtesse",
    "Noblesse",
    "Noir authentique",
    "Passion au lait",
    "Arabia",
    "Stracciatella",
]

df = pd.DataFrame(rows, columns=columns)
df["id"]=df["id"].apply(lambda x: x[2:])
df.to_csv("submission.csv", index=False, encoding="utf-8-sig")

print("Columns in this exact order:", df.columns.tolist())



In [None]:
df = pd.DataFrame(rows, columns=columns)
df["id"]=df["id"].apply(lambda x: x[1:])
df.to_csv("submission.csv", index=False, encoding="utf-8-sig")



In [None]:
df.shape

In [None]:
from PIL import Image
import matplotlib.pyplot as plt
import torch
from torchvision import transforms

def load_test_image(image_path, size=(256, 256)):
    image = Image.open(image_path).convert("RGB").resize(size)
    transform = transforms.ToTensor()
    image_tensor = transform(image).unsqueeze(0)  # shape: [1, 3, H, W]
    return image_tensor, image
def predict_mask(model, image_tensor, threshold=0.5):
    model.eval()
    with torch.no_grad():
        image_tensor = image_tensor.to(device)
        output = model(image_tensor)
        pred_mask = (output.squeeze(0).squeeze(0) > threshold).float().cpu()
    return pred_mask
def show_result(original_img, pred_mask):
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(original_img)
    plt.title("Input Image")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(pred_mask, cmap="gray")
    plt.title("Predicted Mask")
    plt.axis("off")

    plt.tight_layout()
    plt.show()
# Replace with your test image path
test_image_path = "dataset_project_iapr2025/test/L1000839.JPG"

# Load model (if starting fresh)
model = UNet().to(device)
model.load_state_dict(torch.load("tout5.pth", map_location=device))

# Inference
image_tensor, original_img = load_test_image(test_image_path, size=(256, 256))
predicted_mask = predict_mask(model, image_tensor)
show_result(original_img, predicted_mask)


# Backup


In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
from scipy import ndimage as ndi
from skimage.measure      import label, regionprops
from skimage.morphology   import remove_small_objects, binary_dilation, disk
from skimage.segmentation import watershed
from torchvision import transforms as T
import torch.nn as nn
from torch.utils.data import Dataset

# -------------------------
# 1) Model Definitions
# -------------------------
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super().__init__()
        def conv_block(in_c, out_c):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, 3, padding=1),
                nn.BatchNorm2d(out_c), nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, 3, padding=1),
                nn.BatchNorm2d(out_c), nn.ReLU(inplace=True),
            )
        self.enc1 = conv_block(in_channels, 64)
        self.enc2 = conv_block(64, 128)
        self.enc3 = conv_block(128, 256)
        self.pool = nn.MaxPool2d(2)
        self.bottleneck = conv_block(256, 512)
        self.up3 = nn.ConvTranspose2d(512,256,2,2)
        self.dec3 = conv_block(512,256)
        self.up2 = nn.ConvTranspose2d(256,128,2,2)
        self.dec2 = conv_block(256,128)
        self.up1 = nn.ConvTranspose2d(128,64,2,2)
        self.dec1 = conv_block(128,64)
        self.final = nn.Conv2d(64,out_channels,1)

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))
        b  = self.bottleneck(self.pool(e3))
        d3 = self.dec3(torch.cat([self.up3(b), e3], dim=1))
        d2 = self.dec2(torch.cat([self.up2(d3), e2], dim=1))
        d1 = self.dec1(torch.cat([self.up1(d2), e1], dim=1))
        return torch.sigmoid(self.final(d1))


class ChocolateNet(nn.Module):
    def __init__(self, num_classes=13):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3,64,3,padding=1), nn.BatchNorm2d(64), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64,128,3,padding=1), nn.BatchNorm2d(128), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128,256,3,padding=1), nn.BatchNorm2d(256), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(256,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(512,512,3,padding=1), nn.BatchNorm2d(512), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1,1))
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512,512), nn.ReLU(), nn.Dropout(0.5),
            nn.Linear(512,num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        return self.classifier(x)


# --------------------------------------
# 2) Inference Dataset with Watershed
# --------------------------------------
class ChocolateInferenceDataset(Dataset):
    def __init__(self,
                 image_dir,
                 seg_model: nn.Module,
                 cls_model: nn.Module,
                 class_names: list,
                 device: torch.device,
                 seg_size=(256,256),
                 cls_size=(256,256),
                 seg_thresh=0.5,
                 min_blob_size=100):
        self.image_dir   = image_dir
        self.fnames      = sorted(os.listdir(image_dir))
        self.seg_model   = seg_model.to(device).eval()
        self.cls_model   = cls_model.to(device).eval()
        self.class_names = class_names
        self.device      = device

        # exactly the same normalization you used for U-Net training
        self.seg_tf = T.Compose([
            T.Resize(seg_size),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],
                        [0.229,0.224,0.225])
        ])
        self.cls_tf = T.Compose([
            T.Resize(cls_size),
            T.ToTensor(),
            T.Normalize([0.485,0.456,0.406],
                        [0.229,0.224,0.225])
        ])

        self.seg_thresh    = seg_thresh
        self.min_blob_size = min_blob_size

    def __len__(self):
        return len(self.fnames)

    def __getitem__(self, idx):
        # load
        fname = self.fnames[idx]
        img   = Image.open(os.path.join(self.image_dir, fname)).convert("RGB")
        W,H   = img.size

        # 1) U-Net segmentation @ low res
        with torch.no_grad():
            x_small = self.seg_tf(img).unsqueeze(0).to(self.device)
            out_small = self.seg_model(x_small)[0,0].cpu().numpy()

        # 2) Binarize + remove tiny specks
        mask_small = out_small > self.seg_thresh
        mask_small = remove_small_objects(mask_small, min_size=self.min_blob_size)

        # 3) Inflate only the very small blobs at low res
        labels_small = label(mask_small)
        inflated_small = np.zeros_like(mask_small)
        for prop in regionprops(labels_small):
            region = (labels_small == prop.label)
            if prop.area < 300:              # tune this small-area cutoff
                region = binary_dilation(region, disk(8))
            inflated_small |= region

        # 4) Upsample to original resolution
        mask_full = np.array(
            Image.fromarray((inflated_small*255).astype(np.uint8))
                 .resize((W,H), Image.NEAREST)
        ) > 0

        # 5) Centroid‐seeded watershed to split touching blobs
        labels0 = label(mask_full)
        dist    = ndi.distance_transform_edt(mask_full)
        markers = np.zeros_like(labels0)
        for i, prop in enumerate(regionprops(labels0), start=1):
            cy, cx = prop.centroid
            markers[int(cy), int(cx)] = i
        labels_ws = watershed(-dist, markers, mask=mask_full)

        # 6) Crop & classify each region
        results = []
        for region in regionprops(labels_ws):
            minr, minc, maxr, maxc = region.bbox
            patch = img.crop((minc, minr, maxc, maxr))
            x = self.cls_tf(patch).unsqueeze(0).to(self.device)
            with torch.no_grad():
                pred = self.cls_model(x).argmax(1).item()
            results.append({
                "bbox": (minc, minr, maxr-minr, maxc-minc),
                "pred": self.class_names[pred]
            })

        return img, fname, results




# 3) Usage + Visualization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_names = class_names = [
    "Amandina",
    "Arabia",
    "Comtesse",
    "Crème brulée",
    "Jelly Black",
    "Jelly Milk",
    "Jelly White",
    "Noblesse",
    "Noir authentique",
    "Passion au lait",
    "Stracciatella",
    "Tentation noir",
    "Triangolo",

]
# instantiate models and load weights
seg_model = UNet()
seg_model.load_state_dict(torch.load("Bon_mask_model4_epoch25.pth", map_location=device))
cls_model = ChocolateNet(len(class_names))
cls_model.load_state_dict(torch.load("best_choco_cnn_square.pth", map_location=device))

# dataset
dataset = ChocolateInferenceDataset(
    image_dir="/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/test",
    seg_model=seg_model,
    cls_model=cls_model,
    class_names=class_names,
    device=device
)

# iterate & display
output_dir = "/content/drive/MyDrive/Colab Notebooks/project/dataset_project_iapr2025/test_results_v2"
os.makedirs(output_dir, exist_ok=True)
# for img, fname, results in dataset:
#     fig, ax = plt.subplots(1,1,figsize=(6,6))
#     ax.imshow(img); ax.set_title(fname); ax.axis('off')
#     dets = [r["pred"] for r in results]
#     print(f"{fname} → {dets}")
#     for r in results:
#         x,y,w,h = r["bbox"]
#         ax.add_patch(plt.Rectangle((x,y), w, h, fill=False, edgecolor='lime', lw=2))
#         ax.text(x, y-5, r["pred"], color='white',
#                 bbox=dict(facecolor='black', alpha=0.5), fontsize=8)
#     plt.show()
#     save_path = os.path.join(output_dir, fname)
#     fig.savefig(save_path, bbox_inches='tight', pad_inches=0)
#     plt.close(fig)

# Make DataFrame & save
rows = []
for img, fname, results in dataset:
    image_id = os.path.splitext(fname)[0]
    counts = {cn: 0 for cn in class_names}
    for r in results:
        counts[r["pred"]] += 1
    row = {"id": image_id}
    row.update(counts)
    rows.append(row)

columns = [
    "id",
    "Jelly White",
    "Jelly Milk",
    "Jelly Black",
    "Amandina",
    "Crème brulée",
    "Triangolo",
    "Tentation noir",
    "Comtesse",
    "Noblesse",
    "Noir authentique",
    "Passion au lait",
    "Arabia",
    "Stracciatella",
]

df = pd.DataFrame(rows, columns=columns)
df["id"]=df["id"].apply(lambda x: x[2:])
df.to_csv("submission.csv", index=False, encoding="utf-8-sig")

print("Columns in this exact order:", df.columns.tolist())

