In [1]:
!pip install medmnist

Collecting medmnist
  Downloading medmnist-3.0.2-py3-none-any.whl.metadata (14 kB)
Collecting fire (from medmnist)
  Downloading fire-0.7.0.tar.gz (87 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/87.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m87.2/87.2 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading medmnist-3.0.2-py3-none-any.whl (25 kB)
Building wheels for collected packages: fire
  Building wheel for fire (setup.py) ... [?25l[?25hdone
  Created wheel for fire: filename=fire-0.7.0-py3-none-any.whl size=114249 sha256=1dda2528c15aa414c095e3ff2c3630868b2d158012171626789a75858493dfcb
  Stored in directory: /root/.cache/pip/wheels/19/39/2f/2d3cadc408a8804103f1c34ddd4b9f6a93497b11fa96fe738e
Successfully built fire
Installing collected packages: fire, medmnist
Successfully installed fire-0.7.0 medmnist-3.0.2


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import medmnist
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import models
from medmnist import PathMNIST
from tqdm import tqdm

# data augmentation for training
train_transform = transforms.Compose([
  transforms.Resize((32, 32)),
  transforms.ColorJitter(brightness=0.2, contrast=0.2),
  transforms.ToTensor(),
  transforms.Normalize(mean=[.5], std=[.5])
])

# basic transforms for validation/testing
val_transform = transforms.Compose([
  transforms.Resize((32, 32)),
  transforms.ToTensor(),
  transforms.Normalize(mean=[.5], std=[.5])
])

# setup device and dataset parameters
download=True
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

# load pathmnist datasets for each split (train/val/test)
train_dataset = PathMNIST(split='train', transform=train_transform, download=download)
val_dataset = PathMNIST(split='val', transform=val_transform, download=download)
test_dataset = PathMNIST(split='test', transform=val_transform, download=download)

# create data loaders with batch size 128
BATCH_SIZE = 128
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(dataset=val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# main ensemble model
class Model(nn.Module):
  def __init__(self, model1, model2, model3, num_classes):
    super(Model, self).__init__()
    self.model1 = model1      # initialize component models
    self.model2 = model2
    self.model3 = model3
    self.num_classes = num_classes
    self.weights = nn.Parameter(torch.ones(3, num_classes))     # learnable weights
    self.dropout = nn.Dropout(0.2)      # dropout for regularization

  def forward(self, x):
    # get softmax outputs from each model
    out1 = F.softmax(self.model1(x), dim=1)
    out2 = F.softmax(self.model2(x), dim=1)
    out3 = F.softmax(self.model3(x), dim=1)

    # apply learned weights to each model's output
    weighted_out1 = out1 * self.weights[0]
    weighted_out2 = out2 * self.weights[1]
    weighted_out3 = out3 * self.weights[2]

    # combine weighted outputs and apply dropout
    combined_output = weighted_out1 + weighted_out2 + weighted_out3
    combined_output = self.dropout(combined_output)
    return combined_output

# helper function to calculate accuracy
def calculate_accuracy(outputs, labels):
  _, preds = torch.max(outputs, 1)
  corrects = torch.sum(preds == labels.data)
  accuracy = corrects.double() / len(labels)
  return accuracy

num_classes = 9

Device: cpu
Downloading https://zenodo.org/records/10519652/files/pathmnist.npz?download=1 to /root/.medmnist/pathmnist.npz


100%|██████████| 206M/206M [00:10<00:00, 19.2MB/s]


Using downloaded and verified file: /root/.medmnist/pathmnist.npz
Using downloaded and verified file: /root/.medmnist/pathmnist.npz


Pretrained

In [3]:
print("Training Pretrained Ensemble")

# initialize pretrained models
model1 = models.squeezenet1_1(pretrained=True)
model2 = models.mobilenet_v2(pretrained=True)
model3 = models.shufflenet_v2_x1_0(pretrained=True)

# modify final layers for pathmnist(our dataset) classes
model1.classifier[1] = nn.Conv2d(model1.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
model1.num_classes = num_classes
model2.classifier[1] = nn.Linear(model2.last_channel, num_classes)
model3.fc = nn.Linear(model3.fc.in_features, num_classes)

# create ensemble model
model = Model(model1, model2, model3, num_classes)
model = model.to(device)

# setup loss and optimizer with fixed learning rate
criterion = nn.CrossEntropyLoss()
params = (list(model1.parameters()) +
         list(model2.parameters()) +
         list(model3.parameters()) +
         list(model.parameters()))
optimizer = optim.Adam(params, lr=0.0004, weight_decay=1e-5)

# training loop parameters
num_epochs = 10
best_val_loss = float('inf')
best_model_weights = None

# main training loop
for epoch in range(num_epochs):
    # training phase
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs.to(device))
        labels = labels.long().view(-1).to(device)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
    print(f'Learning rate: {optimizer.param_groups[0]["lr"]:.6f}')

    # validation phase
    model.eval()
    val_running_loss = 0.0
    val_running_corrects = 0
    with torch.no_grad():
        for val_inputs, val_labels in val_loader:
            val_outputs = model(val_inputs.to(device))
            val_labels = val_labels.long().view(-1).to(device)
            val_loss = criterion(val_outputs, val_labels)
            val_running_loss += val_loss.item()
            val_accuracy = calculate_accuracy(val_outputs, val_labels)
            val_running_corrects += val_accuracy * len(val_labels)

    # calculate epoch validation metrics
    val_epoch_loss = val_running_loss / len(val_loader)
    val_epoch_acc = val_running_corrects / len(val_loader.dataset)
    print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}')

    # save best model weights
    if val_epoch_loss < best_val_loss:
        best_val_loss = val_epoch_loss
        best_model_weights = model.state_dict()
        print("Updating best weights!")

# evaluate pretrained ensemble
print('\nTesting pretrained ensemble:')
model.load_state_dict(best_model_weights)
model.eval()
running_corrects = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model(test_inputs.to(device))
        test_labels = test_labels.long().view(-1).to(device)
        test_accuracy = calculate_accuracy(test_outputs, test_labels)
        running_corrects += test_accuracy * len(test_labels)

test_acc_pretrained = running_corrects / len(test_loader.dataset)
print(f'Test Accuracy (pretrained): {test_acc_pretrained:.4f}')

Training Pretrained Ensemble


Downloading: "https://download.pytorch.org/models/squeezenet1_1-b8a52dc0.pth" to /root/.cache/torch/hub/checkpoints/squeezenet1_1-b8a52dc0.pth
100%|██████████| 4.73M/4.73M [00:00<00:00, 167MB/s]
Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-b0353104.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 166MB/s]
Downloading: "https://download.pytorch.org/models/shufflenetv2_x1-5666bf0f80.pth" to /root/.cache/torch/hub/checkpoints/shufflenetv2_x1-5666bf0f80.pth
100%|██████████| 8.79M/8.79M [00:00<00:00, 157MB/s]
  return disable_fn(*args, **kwargs)
  1%|▏         | 9/704 [00:24<32:00,  2.76s/it]


KeyboardInterrupt: 

**From Scratch**

In [4]:
# Training Model from Scratch
print("Training Model from scratch")

# initialize models from scratch
model1_scratch = models.squeezenet1_1(pretrained=False)
model2_scratch = models.mobilenet_v2(pretrained=False)
model3_scratch = models.shufflenet_v2_x1_0(pretrained=False)

# modify final layers for pathmnist classes
model1_scratch.classifier[1] = nn.Conv2d(model1_scratch.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
model1_scratch.num_classes = num_classes
model2_scratch.classifier[1] = nn.Linear(model2_scratch.last_channel, num_classes)
model3_scratch.fc = nn.Linear(model3_scratch.fc.in_features, num_classes)

# create ensemble model
model_scratch = Model(model1_scratch, model2_scratch, model3_scratch, num_classes)
model_scratch = model_scratch.to(device)

# setup loss and optimizer
criterion = nn.CrossEntropyLoss()
params_scratch = (list(model1_scratch.parameters()) +
                 list(model2_scratch.parameters()) +
                 list(model3_scratch.parameters()) +
                 list(model_scratch.parameters()))
optimizer = optim.Adam(params_scratch, lr=0.0004, weight_decay=1e-5)

# training loop parameters
num_epochs = 10
best_val_loss = float('inf')
best_model_weights = None

# main training loop for scratch model
for epoch in range(num_epochs):
    # training phase
    model_scratch.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()
        outputs = model_scratch(inputs.to(device))
        labels = labels.long().view(-1).to(device)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
    print(f'Learning rate: {optimizer.param_groups[0]["lr"]:.6f}')

    # validation phase
    model_scratch.eval()
    val_running_loss = 0.0
    val_running_corrects = 0
    with torch.no_grad():
        for val_inputs, val_labels in val_loader:
            val_outputs = model_scratch(val_inputs.to(device))
            val_labels = val_labels.long().view(-1).to(device)
            val_loss = criterion(val_outputs, val_labels)
            val_running_loss += val_loss.item()
            val_accuracy = calculate_accuracy(val_outputs, val_labels)
            val_running_corrects += val_accuracy * len(val_labels)

    val_epoch_loss = val_running_loss / len(val_loader)
    val_epoch_acc = val_running_corrects / len(val_loader.dataset)
    print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}')

    if val_epoch_loss < best_val_loss:
        best_val_loss = val_epoch_loss
        best_model_weights = model_scratch.state_dict()
        print("Updating best weights!")

# evaluate model trained from scratch
print('\nTesting model trained from scratch:')
model_scratch.load_state_dict(best_model_weights)
model_scratch.eval()
running_corrects = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model_scratch(test_inputs.to(device))
        test_labels = test_labels.long().view(-1).to(device)
        test_accuracy = calculate_accuracy(test_outputs, test_labels)
        running_corrects += test_accuracy * len(test_labels)
test_acc_scratch = running_corrects / len(test_loader.dataset)
print(f'Test Accuracy (from scratch): {test_acc_scratch:.4f}')



Training Model from scratch


  0%|          | 2/704 [00:05<32:49,  2.81s/it]


KeyboardInterrupt: 

**Weight Matrix**

In [5]:
# Training Only Weight Matrix (Models Frozen)
print("\nTraining Only Weight Matrix (Models Frozen)")

# initialize pretrained models
model1_frozen = models.squeezenet1_1(pretrained=True)
model2_frozen = models.mobilenet_v2(pretrained=True)
model3_frozen = models.shufflenet_v2_x1_0(pretrained=True)

# modify final layers for pathmnist classes
model1_frozen.classifier[1] = nn.Conv2d(model1_frozen.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
model1_frozen.num_classes = num_classes
model2_frozen.classifier[1] = nn.Linear(model2_frozen.last_channel, num_classes)
model3_frozen.fc = nn.Linear(model3_frozen.fc.in_features, num_classes)

# create ensemble model
model_frozen = Model(model1_frozen, model2_frozen, model3_frozen, num_classes)
model_frozen = model_frozen.to(device)

# setup optimizer
optimizer = optim.Adam([model_frozen.weights], lr=0.0004, weight_decay=1e-5)

# training loop parameters
best_val_loss = float('inf')
best_model_weights = None

# main training loop for frozen model
for epoch in range(num_epochs):
    # training phase
    model_frozen.train()
    running_loss = 0.0
    for inputs, labels in tqdm(train_loader):
        optimizer.zero_grad()
        outputs = model_frozen(inputs.to(device))
        labels = labels.long().view(-1).to(device)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
    print(f'Learning rate: {optimizer.param_groups[0]["lr"]:.6f}')

    # validation phase
    model_frozen.eval()
    val_running_loss = 0.0
    val_running_corrects = 0
    with torch.no_grad():
        for val_inputs, val_labels in val_loader:
            val_outputs = model_frozen(val_inputs.to(device))
            val_labels = val_labels.long().view(-1).to(device)
            val_loss = criterion(val_outputs, val_labels)
            val_running_loss += val_loss.item()
            val_accuracy = calculate_accuracy(val_outputs, val_labels)
            val_running_corrects += val_accuracy * len(val_labels)

    val_epoch_loss = val_running_loss / len(val_loader)
    val_epoch_acc = val_running_corrects / len(val_loader.dataset)
    print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}')

    if val_epoch_loss < best_val_loss:
        best_val_loss = val_epoch_loss
        best_model_weights = model_frozen.state_dict()
        print("Updating best weights!")

# evaluate weight matrix only model
print('\nTesting weight matrix only model:')
model_frozen.load_state_dict(best_model_weights)
model_frozen.eval()
running_corrects = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model_frozen(test_inputs.to(device))
        test_labels = test_labels.long().view(-1).to(device)
        test_accuracy = calculate_accuracy(test_outputs, test_labels)
        running_corrects += test_accuracy * len(test_labels)
test_acc_frozen = running_corrects / len(test_loader.dataset)
print(f'Test Accuracy (weight matrix only): {test_acc_frozen:.4f}')


Training Only Weight Matrix (Models Frozen)


  0%|          | 2/704 [00:03<20:12,  1.73s/it]


KeyboardInterrupt: 

BASELINE

In [None]:
print("\nEvaluating Individual Models")

models_list = [
  ("squeezenet", models.squeezenet1_1(pretrained=True)),
  ("mobilenet", models.mobilenet_v2(pretrained=True)),
  ("shufflenet", models.shufflenet_v2_x1_0(pretrained=True))
]

individual_accuracies = {}

for name, individual_model in models_list:
  print(f"\nEvaluating {name}")

  if name == "squeezenet":
    individual_model.classifier[1] = nn.Conv2d(individual_model.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
    individual_model.num_classes = num_classes
  elif name == "mobilenet":
    individual_model.classifier[1] = nn.Linear(individual_model.last_channel, num_classes)
  else:
    individual_model.fc = nn.Linear(individual_model.fc.in_features, num_classes)

  individual_model = individual_model.to(device)
  individual_model.eval()
  running_corrects = 0

  with torch.no_grad():
    for test_inputs, test_labels in test_loader:
      test_outputs = individual_model(test_inputs.to(device))
      test_labels = test_labels.long().view(-1).to(device)
      test_accuracy = calculate_accuracy(test_outputs, test_labels)
      running_corrects += test_accuracy * len(test_labels)

  test_acc_individual = running_corrects / len(test_loader.dataset)
  individual_accuracies[name] = test_acc_individual
  print(f'Test Accuracy ({name}): {test_acc_individual:.4f}')

Individual Model Training with Pretrained

In [None]:
print("\nTraining Individual Models")

models_list = [
    ("squeezenet", models.squeezenet1_1(pretrained=True)),
    ("mobilenet", models.mobilenet_v2(pretrained=True)),
    ("shufflenet", models.shufflenet_v2_x1_0(pretrained=True))
]

individual_accuracies = {}

for name, individual_model in models_list:
    print(f"\nTraining {name}")

    # modify final layer to match number of classes
    if name == "squeezenet":
        individual_model.classifier[1] = nn.Conv2d(individual_model.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
        individual_model.num_classes = num_classes
    elif name == "mobilenet":
        individual_model.classifier[1] = nn.Linear(individual_model.last_channel, num_classes)
    else:
        individual_model.fc = nn.Linear(individual_model.fc.in_features, num_classes)

    individual_model = individual_model.to(device)

    # setup
    optimizer = optim.Adam(individual_model.parameters(), lr=0.0004, weight_decay=1e-5)
    best_val_loss = float('inf')
    best_model_weights = None

    for epoch in range(num_epochs):
        # training phase
        individual_model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader):
            optimizer.zero_grad()
            outputs = individual_model(inputs.to(device))
            labels = labels.long().view(-1).to(device)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
        print(f'Learning rate: {optimizer.param_groups[0]["lr"]:.6f}')

        # validation phase
        individual_model.eval()
        val_running_loss = 0.0
        val_running_corrects = 0
        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_outputs = individual_model(val_inputs.to(device))
                val_labels = val_labels.long().view(-1).to(device)
                val_loss = criterion(val_outputs, val_labels)
                val_running_loss += val_loss.item()
                val_accuracy = calculate_accuracy(val_outputs, val_labels)
                val_running_corrects += val_accuracy * len(val_labels)

        val_epoch_loss = val_running_loss / len(val_loader)
        val_epoch_acc = val_running_corrects / len(val_loader.dataset)
        print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}')

        if val_epoch_loss < best_val_loss:
            best_val_loss = val_epoch_loss
            best_model_weights = individual_model.state_dict()
            print("Updating best weights!")

    individual_model.load_state_dict(best_model_weights)
    individual_model.eval()
    running_corrects = 0
    with torch.no_grad():
        for test_inputs, test_labels in test_loader:
            test_outputs = individual_model(test_inputs.to(device))
            test_labels = test_labels.long().view(-1).to(device)
            test_accuracy = calculate_accuracy(test_outputs, test_labels)
            running_corrects += test_accuracy * len(test_labels)
    test_acc_individual = running_corrects / len(test_loader.dataset)
    individual_accuracies[name] = test_acc_individual
    print(f'Test Accuracy ({name}): {test_acc_individual:.4f}')

Individual Model Training from Scratch

In [None]:
print("\nTraining non-pretrained Individual Models")

models_list = [
    ("squeezenet", models.squeezenet1_1(pretrained=False)),
    ("mobilenet", models.mobilenet_v2(pretrained=False)),
    ("shufflenet", models.shufflenet_v2_x1_0(pretrained=False))
]

individual_accuracies_scratch = {}

for name, individual_model in models_list:
    print(f"\nTraining {name}")

    if name == "squeezenet":
        individual_model.classifier[1] = nn.Conv2d(individual_model.classifier[1].in_channels, num_classes, kernel_size=(1, 1), stride=(1, 1))
        individual_model.num_classes = num_classes
    elif name == "mobilenet":
        individual_model.classifier[1] = nn.Linear(individual_model.last_channel, num_classes)
    else:
        individual_model.fc = nn.Linear(individual_model.fc.in_features, num_classes)

    individual_model = individual_model.to(device)

    optimizer = optim.Adam(individual_model.parameters(), lr=0.0004, weight_decay=1e-5)
    best_val_loss = float('inf')
    best_model_weights = None

    for epoch in range(num_epochs):
        individual_model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader):
            optimizer.zero_grad()
            outputs = individual_model(inputs.to(device))
            labels = labels.long().view(-1).to(device)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
        print(f'Learning rate: {optimizer.param_groups[0]["lr"]:.6f}')

        individual_model.eval()
        val_running_loss = 0.0
        val_running_corrects = 0
        with torch.no_grad():
            for val_inputs, val_labels in val_loader:
                val_outputs = individual_model(val_inputs.to(device))
                val_labels = val_labels.long().view(-1).to(device)
                val_loss = criterion(val_outputs, val_labels)
                val_running_loss += val_loss.item()
                val_accuracy = calculate_accuracy(val_outputs, val_labels)
                val_running_corrects += val_accuracy * len(val_labels)

        val_epoch_loss = val_running_loss / len(val_loader)
        val_epoch_acc = val_running_corrects / len(val_loader.dataset)
        print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}')

        if val_epoch_loss < best_val_loss:
            best_val_loss = val_epoch_loss
            best_model_weights = individual_model.state_dict()
            print("Updating best weights!")

    individual_model.load_state_dict(best_model_weights)
    individual_model.eval()
    running_corrects = 0
    with torch.no_grad():
        for test_inputs, test_labels in test_loader:
            test_outputs = individual_model(test_inputs.to(device))
            test_labels = test_labels.long().view(-1).to(device)
            test_accuracy = calculate_accuracy(test_outputs, test_labels)
            running_corrects += test_accuracy * len(test_labels)
    test_acc_individual = running_corrects / len(test_loader.dataset)
    individual_accuracies_scratch[name] = test_acc_individual
    print(f'Test Accuracy ({name}): {test_acc_individual:.4f}')

# Print Final Results Summary
print("\nFinal Results Summary:")
print(f"Ensemble Model (Pretrained) Test Accuracy: {test_acc_pretrained:.4f}")
print(f"Ensemble Model (From Scratch) Test Accuracy: {test_acc_scratch:.4f}")
print(f"Weight Matrix Only Test Accuracy: {test_acc_frozen:.4f}")

print("\nPretrained Individual Models:")
for name, acc in individual_accuracies.items():
    print(f"Individual {name} Test Accuracy: {acc:.4f}")

print("\nNon-pretrained Individual Models:")
for name, acc in individual_accuracies_scratch.items():
    print(f"Individual {name} Test Accuracy: {acc:.4f}")