In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image, ImageEnhance
import pandas as pd
from pathlib import Path
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import f1_score, roc_auc_score
import timm
import os
import json
import random

In [None]:
PROJECT_ROOT = Path(r"../") 
CSV_DIR = PROJECT_ROOT / "data" / "imagelevel"
TRAIN_CSV = CSV_DIR / "train.csv"
VAL_CSV = CSV_DIR / "val.csv"
TEST_CSV = CSV_DIR / "test.csv"
MODEL_DIR = PROJECT_ROOT / "models"
MODEL_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
BATCH_SIZE = 16
IMG_SIZE = 640
LR = 1e-4
EPOCHS = 10
NUM_WORKERS = 4
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
LIMITS = {
    "train": 300,
    "val": 150,
    "test": 100
}

def trim_csv(csv_path, limit, seed=42):
    df = pd.read_csv(csv_path)
    random.seed(seed)
    trimmed_rows = []
    classes_set = set()
    
    # собираем все классы
    for row in df['classes'].fillna(""):
        row_classes = [int(c) for c in row.split(",") if c != ""]
        classes_set.update(row_classes)
    classes_list = list(classes_set)
    
    # обрезаем по лимиту
    for cls in classes_list:
        cls_rows = df[df['classes'].str.contains(str(cls), na=False)].copy()
        if len(cls_rows) > limit:
            cls_rows = cls_rows.sample(limit, random_state=seed)
        trimmed_rows.append(cls_rows)
    
    df_trimmed = pd.concat(trimmed_rows).drop_duplicates().reset_index(drop=True)
    return df_trimmed

train_trimmed = trim_csv(TRAIN_CSV, LIMITS['train'])
val_trimmed = trim_csv(VAL_CSV, LIMITS['val'])
test_trimmed = trim_csv(TEST_CSV, LIMITS['test'])

In [None]:
class MultiLabelDataset(Dataset):
    def __init__(self, df, project_root, transform=None, mlb=None):
        self.df = df
        self.project_root = Path(project_root)
        self.transform = transform
        self.mlb = mlb

        self.labels_list = self.df['classes'].fillna("").apply(lambda x: [int(c) for c in x.split(",") if c!=""]).tolist()
        if self.mlb is None:
            self.mlb = MultiLabelBinarizer()
            self.mlb.fit(self.labels_list)
        self.labels_bin = self.mlb.transform(self.labels_list)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.project_root / self.df.iloc[idx]['image_path']
        img = Image.open(img_path).convert("RGB")
        img = img.resize((IMG_SIZE, IMG_SIZE))
        enhancer = ImageEnhance.Contrast(img)
        img = enhancer.enhance(1.2)
        if self.transform:
            img = self.transform(img)
        label = torch.FloatTensor(self.labels_bin[idx])
        return img, label

In [None]:
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
])

val_transform = transforms.Compose([
    transforms.ToTensor(),
])

In [7]:
train_dataset = MultiLabelDataset(train_trimmed, PROJECT_ROOT, transform=train_transform)
val_dataset = MultiLabelDataset(val_trimmed, PROJECT_ROOT, transform=val_transform, mlb=train_dataset.mlb)
test_dataset = MultiLabelDataset(test_trimmed, PROJECT_ROOT, transform=val_transform, mlb=train_dataset.mlb)

NUM_CLASSES = len(train_dataset.mlb.classes_)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)




In [None]:
def print_class_stats(datasets, names):
    """
    datasets: list of MultiLabelDataset
    names: list of str ["train", "val", "test"]
    """
    result = {"classes": [str(c) for c in datasets[0].mlb.classes_], "counts": {}}
    for ds, name in zip(datasets, names):
        counts = {}
        for idx, cls in enumerate(ds.mlb.classes_):
            counts[str(cls)] = int(ds.labels_bin[:, idx].sum())
        result["counts"][name] = counts
    print(json.dumps(result, indent=4))

print("Class distribution after trimming:")
print_class_stats([train_dataset, val_dataset, test_dataset], ["train", "val", "test"])

Class distribution after trimming:
{
    "classes": [
        "0",
        "1",
        "2",
        "4",
        "6",
        "7",
        "10",
        "11",
        "12"
    ],
    "counts": {
        "train": {
            "0": 424,
            "1": 224,
            "2": 31,
            "4": 300,
            "6": 360,
            "7": 332,
            "10": 56,
            "11": 53,
            "12": 778
        },
        "val": {
            "0": 222,
            "1": 123,
            "2": 17,
            "4": 150,
            "6": 182,
            "7": 165,
            "10": 21,
            "11": 24,
            "12": 377
        },
        "test": {
            "0": 169,
            "1": 124,
            "2": 12,
            "4": 100,
            "6": 134,
            "7": 135,
            "10": 28,
            "11": 24,
            "12": 220
        }
    }
}


In [None]:
model = timm.create_model("mobilenetv3_small_100", pretrained=True, num_classes=NUM_CLASSES)
model = model.to(DEVICE)

model.safetensors:   0%|          | 0.00/10.2M [00:00<?, ?B/s]

In [None]:
# считаем количество примеров для каждого класса
labels_bin = train_dataset.labels_bin
class_counts = labels_bin.sum(axis=0)
total_samples = labels_bin.shape[0]
pos_weight = torch.tensor((total_samples - class_counts) / (class_counts + 1e-6), dtype=torch.float32).to(DEVICE)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)

In [None]:
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    running_loss = 0.0
    total_batches = len(loader)
    for i, (imgs, labels) in enumerate(loader, 1):
        imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * imgs.size(0)

        if i % 5 == 0 or i == total_batches:
            print(f"\rBatch {i}/{total_batches} ({i/total_batches*100:.1f}%)", end="")

    print()  # перенос строки после окончания эпохи
    return running_loss / len(loader.dataset)

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    running_loss = 0.0
    all_labels, all_preds = [], []
    for imgs, labels in loader:
        imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * imgs.size(0)
        all_preds.append(torch.sigmoid(outputs).cpu().numpy())
        all_labels.append(labels.cpu().numpy())
    all_preds = np.vstack(all_preds)
    all_labels = np.vstack(all_labels)
    f1 = f1_score(all_labels, all_preds > 0.5, average='micro')
    try:
        roc_auc = roc_auc_score(all_labels, all_preds, average='micro')
    except ValueError:
        roc_auc = 0.0
    return running_loss / len(loader.dataset), f1, roc_auc

In [None]:
best_val_loss = float('inf')
for epoch in range(1, EPOCHS + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, val_f1, val_roc = evaluate(model, val_loader, criterion)
    print(f"Epoch {epoch}/{EPOCHS} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | F1: {val_f1:.4f} | ROC-AUC: {val_roc:.4f}")
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), MODEL_DIR / "best_model.pth")
        print("Saved best model.")

In [None]:
est_loss, test_f1, test_roc = evaluate(model, test_loader, criterion)
print(f"Test Loss: {est_loss:.4f} | F1: {test_f1:.4f} | ROC-AUC: {test_roc:.4f}")