Core code for preparing dataset and forming the CNN architecture

In [54]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split,Subset
from torchvision import datasets, transforms
from sklearn.metrics import pairwise_distances

transform = transforms.Compose([
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flattened_size = self._get_flattened_size()
        self.fc1 = nn.Linear(self.flattened_size, 128)
        self.fc2 = nn.Linear(128, 10)

    def _get_flattened_size(self):
        with torch.no_grad():
            x = torch.zeros(1, 1, 28, 28)
            x = self.pool(F.relu(self.conv2(F.relu(self.conv1(x)))))
            return x.numel()

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

Defining function for pre-trained model

In [55]:
class PretrainedModel(nn.Module):
    def __init__(self):
        super(PretrainedModel, self).__init__()
        self.model = torchvision.models.resnet18(pretrained=True)
        self.model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.model.fc = nn.Linear(self.model.fc.in_features, 10)

    def forward(self, x):
        return self.model(x)

Defining function for calculating uncertainty metrices

In [56]:
def calculate_uncertainty_metrics(outputs):
    probabilities = F.softmax(outputs, dim=1)
    least_confidence = 1 - probabilities.max(dim=1).values.cpu().detach().numpy()
    prediction_entropy = -torch.sum(probabilities * torch.log(probabilities + 1e-10), dim=1).cpu().detach().numpy()
    margin_sampling = 1 - (probabilities.topk(2, dim=1).values[:, 0] - probabilities.topk(2, dim=1).values[:, 1]).cpu().detach().numpy()
    return least_confidence, prediction_entropy, margin_sampling

Defining function for calculating diversity metrices

In [57]:
def calculate_diversity_metrics(features, m=5, chunk_size=500):
    cosine_similarities, l2_norms = [], []
    for i in range(0, len(features), chunk_size):
        chunk = features[i:i + chunk_size]
        feature_distances_cosine = pairwise_distances(chunk, features, metric='cosine')
        feature_distances_l2 = pairwise_distances(chunk, features, metric='euclidean')
        cosine_similarities.append(1 - feature_distances_cosine[:, 1:m+1].mean(axis=1))
        l2_norms.append(feature_distances_l2[:, 1:m+1].mean(axis=1))
    return np.concatenate(cosine_similarities), np.concatenate(l2_norms)

Defining function for calculating KL Divergence

In [58]:
def calculate_kl_divergence(outputs, feature_distances, m=5):
    kl_divergence = []
    outputs = F.softmax(outputs, dim=1).cpu().detach().numpy()
    for i in range(len(outputs)):
        current_sample_prob = outputs[i]
        neighbor_indices = np.argsort(feature_distances[i])[1:m+1]
        neighbors_prob = outputs[neighbor_indices].mean(axis=0)
        epsilon = 1e-10
        current_sample_prob += epsilon
        neighbors_prob += epsilon
        kl_divergence.append(np.sum(current_sample_prob * np.log(current_sample_prob / neighbors_prob)))
    return np.array(kl_divergence)

Load Dataset

In [59]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
mnist_train = datasets.MNIST(root="./data", train=True, download=True, transform=transform)
mnist_test = datasets.MNIST(root="./data", train=False, download=True, transform=transform)
train_size = int(0.1 * len(mnist_train))
unlabeled_size = len(mnist_train) - train_size
train_dataset, unlabeled_dataset = random_split(mnist_train, [train_size, unlabeled_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(mnist_test, batch_size=64, shuffle=False)

Training function

In [60]:
def train_model(model, train_loader, epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    model.train()
    for epoch in range(epochs):
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

Evaluation Function

In [61]:
def test_model(model, test_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

Active learning function

In [62]:
def active_learning(
    model, unlabeled_loader, unlabeled_dataset, train_loader,
    acquisition_iterations=3, samples_per_iteration=200, chunk_size=1000
):
    """
    Optimized Active Learning Function
    """
    for iteration in range(acquisition_iterations):
        print(f"Acquisition Iteration {iteration + 1}/{acquisition_iterations}")
        model.eval()
        features, outputs, indices = [], [], []
        with torch.no_grad():
            for i, (images, _) in enumerate(unlabeled_loader):
                images = images.to(device)
                output = model(images)
                features.append(images.view(images.size(0), -1).cpu().numpy())
                outputs.append(output)
                indices.append(np.arange(i * images.size(0), i * images.size(0) + images.size(0)))

                if i * images.size(0) >= chunk_size:
                    break

        features = np.concatenate(features)
        outputs = torch.cat(outputs)
        indices = np.concatenate(indices)

        least_confidence, prediction_entropy, margin_sampling = calculate_uncertainty_metrics(outputs)
        cosine_similarities, _ = calculate_diversity_metrics(features, chunk_size=chunk_size)

        scores = least_confidence + prediction_entropy + margin_sampling + cosine_similarities
        acquisition_indices = indices[np.argsort(scores)[-samples_per_iteration:]]

        new_train_data = Subset(unlabeled_dataset, acquisition_indices.tolist())
        updated_train_dataset = torch.utils.data.ConcatDataset([train_loader.dataset, new_train_data])

        remaining_indices = list(set(range(len(unlabeled_dataset))) - set(acquisition_indices))
        unlabeled_dataset = Subset(unlabeled_dataset, remaining_indices)
        unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=64, shuffle=True)
        train_loader = DataLoader(updated_train_dataset, batch_size=64, shuffle=True)
        train_model(model, train_loader)

    return unlabeled_loader, unlabeled_dataset

train_dataset = datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)
labeled_dataset, unlabeled_dataset = torch.utils.data.random_split(train_dataset, [1000, len(train_dataset) - 1000])

train_loader = DataLoader(labeled_dataset, batch_size=64, shuffle=True)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

Main function

In [63]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Pretrained model evaluation
pretrained_model = CNNModel().to(device)
train_model(pretrained_model, train_loader)
pretrained_accuracy = test_model(pretrained_model, test_loader)
print(f"Pretrained Model Accuracy: {pretrained_accuracy:.2f}%")

# CNN evaluation
custom_model = CNNModel().to(device)
train_model(custom_model, train_loader)
custom_model_accuracy = test_model(custom_model, test_loader)
print(f"Custom CNN Model Accuracy: {custom_model_accuracy:.2f}%")

# Active learning-enhanced model evaluation
active_learning_model = CNNModel().to(device)
unlabeled_loader, unlabeled_dataset = active_learning(
    active_learning_model, unlabeled_loader, unlabeled_dataset, train_loader
)
active_learning_accuracy = test_model(active_learning_model, test_loader)
print(f"Active Learning-Enhanced Model Accuracy: {active_learning_accuracy:.2f}%")

Pretrained Model Accuracy: 89.63%
Custom CNN Model Accuracy: 89.58%
Acquisition Iteration 1/3
Acquisition Iteration 2/3
Acquisition Iteration 3/3
Active Learning-Enhanced Model Accuracy: 95.07%
