Modele takie jak maxvit_t, swin_t czy vit_b_16 są dostępne dopiero od nowszych wersji torchvision (od wersji 0.13 lub 0.14). Jeśli użyjesz wcześniejszych wersji, pojawią się błędy, np. AttributeError: module torchvision.models has no attribute maxvit_t.
Sugestia: Sprawdź wersję biblioteki torchvision w swoim środowisku:
import torchvision
print(torchvision.__version__)
Jeśli używasz starszej wersji, zaktualizuj ją:
pip install --upgrade torchvision

In [1]:
import torch
import torch.nn as nn
import torchvision.models as models

class MultiInputModel(nn.Module):
    def __init__(self, num_classes=11, base_model='efficientnet_b0', filter_num_base=4):
        super(MultiInputModel, self).__init__()
        
        # Wybierz wstępnie przetrenowany model dla obrazów RGB
        self.base_model = base_model
        self.rgb_model, self.base_model_output_size = self._initialize_rgb_model(base_model)

        # Sieć dla obrazu binarnego (widok S)
        input_size_binary = filter_num_base * 4
        self.binary_model = nn.Sequential(
            nn.Conv2d(1, filter_num_base * 2, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(filter_num_base * 2, filter_num_base * 4, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(input_size_binary, 128),  # Dynamiczne wejście w pełni połączonej warstwy
            nn.ReLU()
        )

        # Warstwa łącząca
        self.fc = nn.Sequential(
            nn.Linear(self.base_model_output_size * 2 + filter_num_base * 32, 512),  # Wyjście RGB x2 + wyjście binarne
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def _initialize_rgb_model(self, base_model):
        """
        Inicjalizuje wybrany model sieci RGB i zwraca model oraz rozmiar jego wyjścia.
        """
        if base_model.startswith('efficientnet'):  # Obsługa EfficientNet i EfficientNetV2
            model = getattr(models, base_model)(pretrained=True)
            model.classifier = nn.Identity()
            return model, 1280 if 'b' in base_model else 1408  # 1280 dla EfficientNet-B0/B1, 1408 dla V2-S
        
        elif base_model == 'googlenet':
            model = models.googlenet(pretrained=True)
            model.fc = nn.Identity()
            return model, 1024
        
        elif base_model == 'inception_v3':
            model = models.inception_v3(pretrained=True, aux_logits=False)  # Wyłącz dodatkowe głowice
            model.fc = nn.Identity()
            return model, 2048
        
        elif base_model == 'maxvit_t':
            model = models.maxvit_t(pretrained=True)
            model.classifier = nn.Identity()
            return model, 512
        
        elif base_model == 'mobilenet_v2':
            model = models.mobilenet_v2(pretrained=True)
            model.classifier = nn.Identity()
            return model, 1280
        
        elif base_model == 'mobilenet_v3_large' or base_model == 'mobilenet_v3_small':
            model = getattr(models, base_model)(pretrained=True)
            model.classifier = nn.Identity()
            return model, 1280
        
        elif base_model.startswith('resnet'):  # Obsługa ResNet (np. resnet18, resnet50)
            model = getattr(models, base_model)(pretrained=True)
            model.fc = nn.Identity()
            return model, 2048 if '50' in base_model or '101' in base_model else 512  # Rozmiar zależny od wariantu
        
        elif base_model == 'squeezenet1_0' or base_model == 'squeezenet1_1':
            model = getattr(models, base_model)(pretrained=True)
            model.classifier = nn.Identity()
            return model, 1000  # SqueezeNet zawsze ma 1000 wyjść (feature maps)

        elif base_model == 'swin_t':
            model = models.swin_t(pretrained=True)
            model.head = nn.Identity()
            return model, 768
        
        elif base_model == 'vit_b_16':  # VisionTransformer
            model = models.vit_b_16(pretrained=True)
            model.heads = nn.Identity()
            return model, 768

        else:
            raise ValueError(f"Unsupported base model: {base_model}")

    @staticmethod
    def get_input_size(base_model):
        """
        Zwraca wymagane wymiary wejściowe dla danego modelu.
        
        Args:
            base_model (str): Nazwa modelu bazowego.
            
        Returns:
            tuple: Wymiary wejściowe modelu (wysokość, szerokość).
        """
        if base_model.startswith('efficientnet') or base_model.startswith('mobilenet'):
            return (224, 224)  # EfficientNet, MobileNet wymagają 224x224
            
        elif base_model == 'googlenet':
            return (224, 224)  # GoogLeNet wymaga 224x224
        
        elif base_model == 'inception_v3':
            return (299, 299)  # Inception V3 wymaga 299x299
        
        elif base_model == 'maxvit_t':
            return (224, 224)  # MaxVit wymaga 224x224
        
        elif base_model.startswith('resnet'):
            return (224, 224)  # ResNet (np. ResNet50/ResNet101) wymaga 224x224
        
        elif base_model.startswith('squeezenet'):
            return (224, 224)  # SqueezeNet wymaga 224x224
        
        elif base_model == 'swin_t':
            return (224, 224)  # SwinTransformer wymaga 224x224
        
        elif base_model == 'vit_b_16':  # VisionTransformer
            return (224, 224)  # VisionTransformer wymaga 224x224
        
        else:
            raise ValueError(f"Unsupported base model: {base_model}")

    def forward(self, t_image, b_image, s_image):
        # Pobierz wymagany rozmiar wejściowy
        input_size = self.get_input_size(self.base_model)
        
        # Weryfikacja wejścia `t_image` i `b_image` (RGB) oraz `s_image` (binary)
        assert t_image.shape[-2:] == input_size, f"Expected T image to be of size {input_size}, but got {t_image.shape[-2:]}"
        assert b_image.shape[-2:] == input_size, f"Expected B image to be of size {input_size}, but got {b_image.shape[-2:]}"
        assert s_image.shape[-2:] == input_size, f"Expected S image to be of size {input_size}, but got {s_image.shape[-2:]}"
        
        # Ekstrakcja cech dla widoków RGB
        t_features = self.rgb_model(t_image)  # Widok T
        b_features = self.rgb_model(b_image)  # Widok B

        # Ekstrakcja cech dla obrazu binarnego
        s_features = self.binary_model(s_image)

        # Połączenie cech
        combined_features = torch.cat([t_features, b_features, s_features], dim=1)

        # Klasyfikacja
        output = self.fc(combined_features)
        return output

In [2]:
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd

class MultiInputDataset(Dataset):
    def __init__(self, csv_file, transform_rgb=None, transform_binary=None):
        self.data = pd.read_csv(csv_file)

        # Tworzenie mapowania nazw klas na liczby całkowite
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.data['class'].unique())}

        self.transform_rgb = transform_rgb
        self.transform_binary = transform_binary

    def __len__(self):
        return len(self.data) // 3  # Każde ziarno ma 3 obrazy

    def __getitem__(self, idx):
        # Pobierz trzy obrazy
        base_idx = idx * 3
        t_path = self.data.iloc[base_idx]['path']
        b_path = self.data.iloc[base_idx + 1]['path']
        s_path = self.data.iloc[base_idx + 2]['path']

        t_image = Image.open(t_path).convert("RGB")
        b_image = Image.open(b_path).convert("RGB")
        s_image = Image.open(s_path).convert("L")  # Obraz binarny

        # Transformacje
        if self.transform_rgb:
            t_image = self.transform_rgb(t_image)
            b_image = self.transform_rgb(b_image)
        if self.transform_binary:
            s_image = self.transform_binary(s_image)

        # Pobierz nazwę klasy i przekształć na indeks numeryczny
        class_name = self.data.iloc[base_idx]['class']
        label = self.class_to_idx[class_name]  # Mapowanie nazwy klasy na numer
        label = torch.tensor(label, dtype=torch.long)  # Konwersja na tensor PyTorch

        return t_image, b_image, s_image, label

#Krok 2: Transformacje dla obrazów RGB i binarnych:
from torchvision import transforms

# Transformacje dla obrazów RGB
transform_rgb = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Transformacje dla obrazów binarnych
transform_binary = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim
import os
from tqdm import tqdm

# Włącz blokowanie błędów CUDA
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# Załaduj dane
train_dataset = MultiInputDataset("CSV/dataset/train.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)
val_dataset = MultiInputDataset("CSV/dataset/val.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)
test_dataset = MultiInputDataset("CSV/dataset/test.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)

# List of models to train
models_list = [ 'efficientnet_v2_m', 'mobilenet_v3_small', 'resnet34', 'swin_t', 'vit_b_16']#'efficientnet_b0',
batch_sizes = {
    'efficientnet_b0': 32,
    'efficientnet_v2_m': 16,
    'mobilenet_v3_small': 64,
    'resnet34': 32,
    'swin_t' :16,
    'vit_b_16': 8  
}
for model_name in models_list:
    # Inicjalizacja modelu
    model = MultiInputModel(num_classes=11, base_model=model_name)  # Liczba klas
    model = model.to("cuda")  # Jeśli używasz GPU

    #Dynamicznie przydzielany batch_size

    batch_size = batch_sizes[model_name]
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    # Optymalizator i funkcja straty
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Zapis logów
    log_file = f"training_results/training_log_{model_name}.txt"
    with open(log_file, "w") as f:
        f.write("Epoch, Train Loss, Val Loss\n")

    # Wczesne zatrzymanie - parametry
    early_stop_patience = 5  # Liczba epok bez poprawy
    best_val_loss = float("inf")
    patience_counter = 0
    best_model_path = f"training_results/best_model_{model_name}.pth"

    # Pętla treningowa
    num_epochs = 50
    for epoch in range(num_epochs):
        # === TRENING ===
        print(f"Epoch {epoch + 1}/{num_epochs}")
        model.train()
        train_loss = 0

        # Dodaj pasek postępu do pętli batchy
        with tqdm(total=len(train_loader), desc="Training", unit="batch") as pbar:
            for t_image, b_image, s_image, labels in train_loader:
                t_image, b_image, s_image, labels = (
                    t_image.to("cuda"),
                    b_image.to("cuda"),
                    s_image.to("cuda"),
                    labels.to("cuda")
                )

                # Oblicz predykcje i stratę
                outputs = model(t_image, b_image, s_image)
                loss = criterion(outputs, labels)

                # Backpropagation
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                pbar.set_postfix({"loss": f"{train_loss / (pbar.n + 1):.4f}"})  # Wyświetl średnią stratę
                pbar.update(1)  # Aktualizuj pasek postępu o 1 krok

        train_loss /= len(train_loader)  # Średnia strata w treningu
        print(f"Train Loss: {train_loss:.4f}")

        # === WALIDACJA ===
        model.eval()
        val_loss = 0
        with torch.no_grad():  # Wyłącz gradienty
            with tqdm(total=len(val_loader), desc="Validation", unit="batch") as pbar_val:
                for t_image, b_image, s_image, labels in val_loader:
                    t_image, b_image, s_image, labels = (
                        t_image.to("cuda"),
                        b_image.to("cuda"),
                        s_image.to("cuda"),
                        labels.to("cuda")
                    )
                    outputs = model(t_image, b_image, s_image)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    pbar_val.update(1)  # Aktualizuj pasek postępu walidacji

        val_loss /= len(val_loader)  # Średnia strata w walidacji
        print(f"Val Loss: {val_loss:.4f}")

        # === LOGI ===
        print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
        with open(log_file, "a") as f:
            f.write(f"{epoch + 1}, {train_loss:.4f}, {val_loss:.4f}\n")

        # === WCZESNE ZATRZYMANIE ===
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Zapis najlepszego modelu
            torch.save(model.state_dict(), best_model_path)
            print(f"Best model saved at epoch {epoch + 1}")
        else:
            patience_counter += 1
            print(f"No improvement in val loss for {patience_counter} epoch(s)")

        if patience_counter >= early_stop_patience:
            print("Early stopping triggered. Training stopped.")
            break

    # === TEST ===
    # Wczytaj najlepszy model
    model.load_state_dict(torch.load(best_model_path))
    model.eval()

    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for t_image, b_image, s_image, labels in test_loader:
            t_image, b_image, s_image, labels = (
                t_image.to("cuda"),
                b_image.to("cuda"),
                s_image.to("cuda"),
                labels.to("cuda")
            )
            outputs = model(t_image, b_image, s_image)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            # Oblicz dokładność
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    test_loss /= len(test_loader)
    accuracy = correct / total

    # Zapis wyniku testu
    log_file_test = f"training_results/test_log_{model_name}.txt"
    with open(log_file_test, "w") as f_t:
            f_t.write(f"Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.4f}")
    
    # Zwalnianie pamięci po zakończeniu pracy z modelem
    del model  # Usuń model z pamięci
    torch.cuda.empty_cache()  # Wyczyść pamięć GPU


NameError: name 'batch_size' is not defined

*******************

In [None]:
from torch.utils.data import DataLoader
import torch.optim as optim
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# Załaduj dane
train_dataset = MultiInputDataset("CSV/dataset/train.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)
val_dataset = MultiInputDataset("CSV/dataset/val.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)
test_dataset = MultiInputDataset("CSV/dataset/test.csv", transform_rgb=transform_rgb, transform_binary=transform_binary)
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# Inicjalizacja modelu
model = MultiInputModel(num_classes=11)  # Liczba klas
model = model.to("cuda")  # Jeśli używasz GPU

# Optymalizator i funkcja straty
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Pętla treningowa
for epoch in range(10):
    model.train()
    total_loss = 0
    for t_image, b_image, s_image, labels in train_loader:
        t_image, b_image, s_image, labels = (
            t_image.to("cuda"),
            b_image.to("cuda"),
            s_image.to("cuda"),
            labels.to("cuda")
        )

        # Oblicz predykcje i stratę
        outputs = model(t_image, b_image, s_image)

        #print(f"Outputs shape: {outputs.shape}")  # Dodaj tę linię
        #print(f"Labels shape: {labels.shape}")    # Dodaj tę linię
        #print(f"Labels min: {labels.min()}, Labels max: {labels.max()}")  # Dodaj tę linię
        #print(f"Labels dtype: {labels.dtype}")  # Dodaj tę linię

        loss = criterion(outputs, labels)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()


    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torchvision.datasets import ImageFolder
import copy
from PIL import Image



# Data paths
train_csv = "CSV/dataset/train.csv"
val_csv = "CSV/dataset/val.csv"
test_csv = "CSV/dataset/test.csv"

# Transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]),
}

# Custom Dataset to handle Data from CSV
class CustomDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = self.dataframe.iloc[idx, 0]
        img_path = f"{self.root_dir}/{img_name}"
        image = Image.open(img_path)
        label = int(self.dataframe.iloc[idx, 1])

        if self.transform:
            image = self.transform(image)
        
        return image, label

# Load CSVs and prepare data
train_df = pd.read_csv(train_csv)
val_df = pd.read_csv(val_csv)
test_df = pd.read_csv(test_csv)

# Assuming images are stored in a directory named 'images'
image_dir = 'images'

train_dataset = CustomDataset(train_df, image_dir, transform=data_transforms['train'])
val_dataset = CustomDataset(val_df, image_dir, transform=data_transforms['val'])
test_dataset = CustomDataset(test_df, image_dir, transform=data_transforms['test'])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Function for training
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    best_model_weights = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        print(f'Epoch {epoch}/{num_epochs-1}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')

        # Validation
        model.eval()
        val_corrects = 0

        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            val_corrects += torch.sum(preds == labels.data)

        val_acc = val_corrects.double() / len(val_loader.dataset)
        print(f'Validation Accuracy: {val_acc:.4f}')

        if val_acc > best_acc:
            best_acc = val_acc
            best_model_weights = copy.deepcopy(model.state_dict())

    model.load_state_dict(best_model_weights)
    return model

# Function for testing
def test_model(model, test_loader):
    model.eval()
    test_corrects = 0

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            test_corrects += torch.sum(preds == labels.data)

    test_acc = test_corrects.double() / len(test_loader.dataset)
    print(f'Test Accuracy: {test_acc:.4f}')

# Training and testing each model
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
num_classes =11
for model_name in models_list:
    print(f"\nTraining {model_name} model:")
    if model_name.startswith('efficientnet') or model_name.startswith('mobilenet'):
        model = getattr(models, model_name)(pretrained=True)
        num_features = model.classifier[-1].in_features
        model.classifier[-1] = nn.Linear(num_features, num_classes)  # Adjust to your number of classes
    
    elif model_name == 'resnet34':
        model = models.resnet34(pretrained=True)
        num_features = model.fc.in_features
        model.fc = nn.Linear(num_features, num_classes)
        
    elif model_name == 'vit_b_16':
        model = models.vit_b_16(pretrained=True)
        num_features = model.head.in_features
        model.head = nn.Linear(num_features, num_classes)

    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    # Train and evaluate
    model = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10)
    print(f"Finished training {model_name}")

    # Test
    print(f"Testing {model_name} model:")
    test_model(model, test_loader)

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from torchvision.transforms import transforms

# Twoje klasy i funkcje:
# from your_module import MultiInputModel, MultiInputDataset

def train_and_evaluate(
    models_list,  # Lista nazw modeli bazowych
    train_dataset, val_dataset, test_dataset,  # Datasets
    num_classes, epochs=20, batch_size=32, learning_rate=0.001,
    output_dir='./results', device='cuda'
):
    # Przygotowanie katalogu wyjściowego
    os.makedirs(output_dir, exist_ok=True)
    log_file_path = os.path.join(output_dir, 'training_logs.txt')

    # Zapisz logi
    with open(log_file_path, 'w') as log_file:
        log_file.write(f"Training Logs\n{'=' * 50}\n\n")

    # DataLoadery
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Urządzenie
    device = torch.device(device if torch.cuda.is_available() else 'cpu')

    for model_name in models_list:
        print(f"Rozpoczynam trenowanie: {model_name}")
        
        # Przygotowanie modelu
        model = MultiInputModel(num_classes=num_classes, base_model=model_name)
        model.to(device)

        # Optymalizator i funkcja straty
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

        # Early stopping ustawienia
        best_val_loss = float('inf')
        patience = 5
        patience_counter = 0
        best_model_weights = None

        # Wyniki
        train_losses, val_losses = [], []
        train_accuracies, val_accuracies = [], []

        for epoch in range(epochs):
            # Faza treningowa
            model.train()
            train_loss = 0
            correct = 0
            total = 0

            for t_image, b_image, s_image, labels in train_loader:
                t_image, b_image, s_image, labels = (
                    t_image.to(device),
                    b_image.to(device),
                    s_image.to(device),
                    labels.to(device),
                )

                optimizer.zero_grad()
                outputs = model(t_image, b_image, s_image)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)

            train_loss /= len(train_loader)
            train_acc = correct / total
            
            train_losses.append(train_loss)
            train_accuracies.append(train_acc)

            # Faza walidacyjna
            model.eval()
            val_loss = 0
            correct = 0
            total = 0

            with torch.no_grad():
                for t_image, b_image, s_image, labels in val_loader:
                    t_image, b_image, s_image, labels = (
                        t_image.to(device),
                        b_image.to(device),
                        s_image.to(device),
                        labels.to(device),
                    )

                    outputs = model(t_image, b_image, s_image)
                    loss = criterion(outputs, labels)

                    val_loss += loss.item()
                    _, preds = torch.max(outputs, 1)
                    correct += (preds == labels).sum().item()
                    total += labels.size(0)

            val_loss /= len(val_loader)
            val_acc = correct / total
            
            val_losses.append(val_loss)
            val_accuracies.append(val_acc)

            # Logowanie
            log = f"{model_name} - Epoch {epoch+1}/{epochs}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}"
            print(log)
            with open(log_file_path, 'a') as log_file:
                log_file.write(log + '\n')

            # Early stopping
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_weights = model.state_dict()
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping triggered at epoch {epoch+1}")
                    break

        # Zapisywanie najlepszej wersji modelu
        model.load_state_dict(best_model_weights)
        model_path = os.path.join(output_dir, f"{model_name}_best_model.pth")
        torch.save(model.state_dict(), model_path)
        print(f"Model {model_name} zapisano: {model_path}")

        # Tworzenie wykresu krzywej uczenia
        plt.figure(figsize=(10, 5))
        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.legend()
        plt.title(f'{model_name} - Loss Curve')
        plt.grid(True)
        plt.savefig(os.path.join(output_dir, f"{model_name}_loss_curve.png"), dpi=300, bbox_inches='tight', facecolor='white')
        plt.close()

        # Testowanie i generowanie macierzy pomyłek
        cm, y_true, y_pred = test_model(model, test_loader, device)

        class_names = list(test_dataset.class_to_idx.keys())
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.title(f'{model_name} - Confusion Matrix')
        plt.savefig(os.path.join(output_dir, f"{model_name}_confusion_matrix.png"), dpi=300, bbox_inches='tight', facecolor='white')
        plt.close()

        # Inne metryki
        acc = accuracy_score(y_true, y_pred)
        print(f"Test Accuracy for {model_name}: {acc * 100:.2f}%")
        print(classification_report(y_true, y_pred, target_names=class_names))


def test_model(model, test_loader, device="cuda"):
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for t_image, b_image, s_image, labels in test_loader:
            t_image, b_image, s_image, labels = (
                t_image.to(device),
                b_image.to(device),
                s_image.to(device),
                labels.to(device),
            )

            outputs = model(t_image, b_image, s_image)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    cm = confusion_matrix(y_true, y_pred)
    return cm, y_true, y_pred

In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd
import os

class MultiInputDataset(Dataset):
    def __init__(self, csv_file, transform_rgb=None, transform_binary=None):
        """
        Niestandardowy Dataset wczytujący obrazy z plików CSV.

        Args:
            csv_file (str): Ścieżka do pliku CSV z polami: class, path, id.
            transform_rgb (callable, optional): Transformacje dla obrazów RGB.
            transform_binary (callable, optional): Transformacje dla obrazów binarnych (opcjonalne).
        """
        self.data = pd.read_csv(csv_file)
        self.transform_rgb = transform_rgb
        self.transform_binary = transform_binary

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Pobierz wiersz z pliku CSV
        row = self.data.iloc[idx]
        label = row['class']
        path = row['path']

        # Załaduj obraz jako RGB (3 kanały)
        image_rgb = Image.open(path).convert('RGB')

        # Przygotuj obraz binarny (opcjonalne, np. na podstawie nazwy pliku lub innego źródła)
        binary_mask_path = path.replace('_T.png', '_S.png')  # Przykład nazwy "S" 
        if os.path.exists(binary_mask_path):
            image_binary = Image.open(binary_mask_path).convert('L')
        else:
            raise FileNotFoundError(f"Nie znaleziono _S {binary_mask_path}")

        # Zastosuj transformacje
        if self.transform_rgb:
            image_rgb = self.transform_rgb(image_rgb)
        if self.transform_binary:
            image_binary = self.transform_binary(image_binary)

        # Zwróć dane
        return image_rgb, image_rgb, image_binary, label

In [None]:
import torchvision.transforms as transforms

# Transformacje dla obrazów RGB (dopasowane do ImageNet, np. używane w EfficientNet)
transform_rgb = transforms.Compose([
    transforms.Resize((224, 224)),  # Zmiana rozmiaru obrazu na 224x224
    transforms.ToTensor(),  # Konwersja do tensora
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalizacja zgodna z ImageNet
])

# Transformacje dla obrazów binarnych (normalizacja mask do skali 0-1)
transform_binary = transforms.Compose([
    transforms.Resize((224, 224)),  # Dopasowanie do sieci
    transforms.ToTensor(),  # Konwersja do tensora (L na 1 kanał)
])

In [None]:
# Ścieżki do danych CSV
train_csv = "CSV/dataset/train.csv"
val_csv = "CSV/dataset/val.csv"
test_csv = "CSV/dataset/test.csv"

# Definicja datasetów
train_dataset = MultiInputDataset(train_csv, transform_rgb=transform_rgb, transform_binary=transform_binary)
val_dataset = MultiInputDataset(val_csv, transform_rgb=transform_rgb, transform_binary=transform_binary)
test_dataset = MultiInputDataset(test_csv, transform_rgb=transform_rgb, transform_binary=transform_binary)

In [None]:
from torch.utils.data import DataLoader

# Przygotowanie DataLoaderów
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Pobierz jeden batch
for t_images, b_images, s_images, labels in train_loader:
    #print(f"RGB (T): {t_images.shape}, RGB (B): {b_images.shape}, Binary (S): {s_images.shape}, Labels: {labels.shape}")
    break

In [None]:
models_list = ['efficientnet_b0', 'efficientnet_v2_m', 'mobilenet_v3_small', 'resnet34', 'vit_b_16']

train_and_evaluate(
    models_list=models_list,
    train_dataset=train_dataset,
    val_dataset=val_dataset,
    test_dataset=test_dataset,
    num_classes=11,  # Liczba klas
    epochs=20,
    batch_size=32,
    learning_rate=0.001,
    output_dir='./training_results'
)