In [None]:
# !pip install medmnist

In [None]:
from medmnist import PathMNIST
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder
import torchvision.models as models

from sklearn.metrics import accuracy_score
import wandb

### Initializing Weights and Biases

In [None]:
# Initializing Weights and Biases
wandb.init(project="pathmnist-classification")

### Data preparation

In [None]:
transform_train = transforms.Compose([
    transforms.Resize(64),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.RandomResizedCrop(64, scale=(0.8, 1.0)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

transform_val_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [None]:
train_dataset = PathMNIST(split="train", size=64, transform=transform_train, download=True, target_transform=lambda x: torch.tensor(x, dtype=torch.long).squeeze())
val_dataset = PathMNIST(split="val", size=64, transform=transform_val_test, download=True, target_transform=lambda x: torch.tensor(x, dtype=torch.long).squeeze())
test_dataset = PathMNIST(split="test", size=64, transform=transform_val_test, download=True, target_transform=lambda x: torch.tensor(x, dtype=torch.long).squeeze())

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# check data size
for images, labels in train_loader:
    print(f"Input shape: {images.shape}")
    break

### Functions for model training

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    early_stopping = EarlyStopping(patience=3)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        all_preds = []
        all_labels = []

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            all_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

        train_acc = accuracy_score(all_labels, all_preds)
        val_acc = evaluate_model(model, val_loader, criterion)

        wandb.log({"epoch": epoch + 1, "train_loss": running_loss / len(train_loader), "train_acc": train_acc, "val_acc": val_acc})

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(train_loader):.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")

        if early_stopping(val_acc):
            print("Early stopping triggered!")
            break

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.epochs_without_improvement = 0

    def __call__(self, val_acc):
        if self.best_score is None:
            self.best_score = val_acc
            return False
        elif val_acc < self.best_score + self.delta:
            self.epochs_without_improvement += 1
            if self.epochs_without_improvement >= self.patience:
                return True
        else:
            self.best_score = val_acc
            self.epochs_without_improvement = 0
        return False

In [None]:
def evaluate_model(model, val_loader, criterion):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    val_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            val_loss += criterion(outputs, labels).item()
            all_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    val_acc = accuracy_score(all_labels, all_preds)
    return val_acc

### CNN Network Models

In [None]:
class TransferLearningModel(nn.Module):
    def __init__(self, num_classes=9, pretrained=True):
        super(TransferLearningModel, self).__init__()
        
        # Wczytanie pretrenowanego modelu ResNet18
        self.base_model = models.resnet18(pretrained=pretrained)
        
        # Zastąpienie ostatniej w pełni połączonej warstwy klasyfikatora
        # Dopasowanie wyjścia do liczby klas w zadaniu
        num_features = self.base_model.fc.in_features
        self.base_model.fc = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        return self.base_model(x)

In [None]:
class TransferLearningModelResNet50(nn.Module):
    def __init__(self, num_classes=9, pretrained=True):
        super(TransferLearningModelResNet50, self).__init__()
        
        # Wczytanie pretrenowanego modelu ResNet18
        self.base_model = models.resnet50(pretrained=pretrained)
        
        # Zastąpienie ostatniej w pełni połączonej warstwy klasyfikatora
        # Dopasowanie wyjścia do liczby klas w zadaniu
        num_features = self.base_model.fc.in_features
        self.base_model.fc = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        return self.base_model(x)

In [None]:
class TransferLearningModelInception(nn.Module):
    def __init__(self, num_classes=9, pretrained=True):
        super(TransferLearningModelInception, self).__init__()
        
        # Wczytanie pretrenowanego modelu Inception_v3
        self.base_model = models.inception_v3(pretrained=pretrained, aux_logits=True)
        
        # Zastąpienie ostatniej w pełni połączonej warstwy klasyfikatora
        num_features = self.base_model.fc.in_features
        self.base_model.fc = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        # Inception_v3 wymaga rozdzielonych ścieżek dla danych treningowych i testowych
        # Dla danych treningowych można wykorzystać aux_logits
        return self.base_model(x)

In [None]:
class TransferLearningModelSqueezenet(nn.Module):
    def __init__(self, num_classes=9, pretrained=True):
        super(TransferLearningModelSqueezenet, self).__init__()

        # Wczytanie pretrenowanego SqueezeNet1_1
        self.base_model = models.squeezenet1_1(pretrained=pretrained)

        # Dostosowanie klasyfikatoraa
        self.base_model.classifier = nn.Sequential(
            nn.Conv2d(512, num_classes, kernel_size=1),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1))  # Globalne uśrednianie
        )
        self.num_classes = num_classes

    def forward(self, x):
        return self.base_model(x)

In [None]:
class TransferLearningModelDenseNet(nn.Module):
    def __init__(self, num_classes=9, pretrained=True):
        super(TransferLearningModelDenseNet, self).__init__()

        # Wczytanie pretrenowanego DenseNet121
        self.base_model = models.densenet121(pretrained=pretrained)

        # Zastąpienie klasyfikatora
        num_features = self.base_model.classifier.in_features
        self.base_model.classifier = nn.Sequential(
            nn.Linear(num_features, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        return self.base_model(x)

In [None]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=9):
        super(BaseModel, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU()
        )
        self.flatten = nn.Flatten()
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 7 * 7, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.flatten(x)
        x = self.fc_layers(x)
        return x

In [None]:
class AdvancedModel(nn.Module):
    def __init__(self, num_classes=9, input_shape=(3, 64, 64)):
        super(AdvancedModel, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        # Obliczenie wymiaru wyjściowego dla klasyfikatora
        with torch.no_grad():
            dummy_input = torch.zeros(1, *input_shape)
            dummy_output = self.features(dummy_input)
            flattened_size = dummy_output.view(1, -1).size(1)
            print(f"Flattened size: {flattened_size}")  # Debug: Sprawdzenie wymiarów

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(flattened_size, 256),  # Dynamiczny rozmiar wyjściowy
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # print(f"Input shape: {x.shape}")  # Debugowanie: rozmiar wejściowy
        x = self.features(x)
        # print(f"Features output shape: {x.shape}")  # Rozmiar po bloku features
        x = self.classifier(x)
        # print(f"Classifier output shape: {x.shape}")  # Rozmiar po klasyfikatorze
        return x

### Initialization of the model, optimizer and loss function

In [None]:
model = TransferLearningModelSqueezenet(num_classes=9).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

### Model training

In [None]:
train_model(model, train_loader, val_loader, criterion, optimizer, epochs=20)

### Saving model

In [None]:
torch.save(model.state_dict(), './modele/Squeezenet2501.pth')

### Disconnect Weights & Biases

In [None]:
wandb.finish()