In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import tensorflow as tf
from sklearn.linear_model import LogisticRegression
from matplotlib.gridspec import GridSpec

2023-08-02 02:43:47.784616: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-02 02:43:47.833307: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
def split_dataset_with_equal_class_representation(train_data, train_labels, test_ratio=0.2):
    classes = np.unique(train_labels)
    num_classes = len(classes)

    # Split data into class-specific subsets
    class_data = [train_data[train_labels == c] for c in classes]

    # Calculate the number of samples to include from each class in the test set
    num_samples_per_class = int(len(train_data) * test_ratio / num_classes)

    # Initialize train and test datasets
    train_data_split = []
    test_data_split = []
    train_labels_split = []
    test_labels_split = []

    # Split data for each class
    for c in range(num_classes):
        data_c = class_data[c]
        num_samples_test_c = min(num_samples_per_class, len(data_c))

        # Randomly shuffle the data
        np.random.shuffle(data_c)

        # Split data for class c into train and test sets
        train_data_c = data_c[:-num_samples_test_c]
        test_data_c = data_c[-num_samples_test_c:]

        # Assign labels for each split
        train_labels_c = np.full(len(train_data_c), classes[c])
        test_labels_c = np.full(len(test_data_c), classes[c])

        # Append class-specific data to overall train and test datasets
        train_data_split.append(train_data_c)
        test_data_split.append(test_data_c)
        train_labels_split.append(train_labels_c)
        test_labels_split.append(test_labels_c)

    # Concatenate class-specific data to create final train and test datasets
    train_data_final = np.concatenate(train_data_split, axis=0)
    test_data_final = np.concatenate(test_data_split, axis=0)
    train_labels_final = np.concatenate(train_labels_split, axis=0)
    test_labels_final = np.concatenate(test_labels_split, axis=0)

    # Shuffle the data again to ensure randomness
    train_indices = np.arange(len(train_data_final))
    np.random.shuffle(train_indices)
    train_data_final = train_data_final[train_indices]
    train_labels_final = train_labels_final[train_indices]

    test_indices = np.arange(len(test_data_final))
    np.random.shuffle(test_indices)
    test_data_final = test_data_final[test_indices]
    test_labels_final = test_labels_final[test_indices]

    return (train_data_final, train_labels_final), (test_data_final, test_labels_final)

# Load MNIST dataset
(train_data, train_labels), (_, _) = tf.keras.datasets.mnist.load_data()

# Normalize the pixel values to [0, 1]
train_data = train_data.astype('float32') / 255.0

# Reshape the data to 4D tensor (number of samples, height, width, channels)
train_data = np.expand_dims(train_data, axis=-1)

# Split data into train and test sets with equal class representation
(train_data, train_labels), (test_X, test_y) = split_dataset_with_equal_class_representation(train_data, train_labels, test_ratio=0.2)
(train_X, train_y), (Pool_X, Pool_y) = split_dataset_with_equal_class_representation(train_data, train_labels, test_ratio=0.8)

print("Train data shape:", train_X.shape)
print("Train labels shape:", train_y.shape)
print("Test data shape:", Pool_X.shape)
print("Test labels shape:", Pool_y.shape)
print("Test data shape:", test_X.shape)
print("Test labels shape:", test_y.shape)

Train data shape: (9600, 28, 28, 1)
Train labels shape: (9600,)
Test data shape: (38400, 28, 28, 1)
Test labels shape: (38400,)
Test data shape: (12000, 28, 28, 1)
Test labels shape: (12000,)


In [3]:

train_X = train_X.reshape(train_X.shape[0], -1)
test_X = test_X.reshape(test_X.shape[0], -1)
Pool_X = Pool_X.reshape(Pool_X.shape[0], -1)
print(train_X.shape, test_X.shape, Pool_X.shape)

(9600, 784) (12000, 784) (38400, 784)


In [4]:
train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_y_tensor = torch.tensor(train_y, dtype=torch.long)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_y_tensor = torch.tensor(test_y, dtype=torch.long)
Pool_X_tensor = torch.tensor(Pool_X, dtype=torch.float32)
Pool_y_tensor = torch.tensor(Pool_y, dtype=torch.long)
batch_size = 64

# Create DataLoader for the unlabeled dataset (Pool_X_tensor)
unlabeled_dataset = TensorDataset(Pool_X_tensor)
unlabeled_data_loader = DataLoader(unlabeled_dataset, batch_size=batch_size, shuffle=True)

# Define your SimpleNN model class
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 10)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x, enable_dropout=False):
        x = F.relu(self.fc1(x))
        if enable_dropout:
            x = self.dropout(x)
        x = self.fc2(x)
        return x

    def enable_dropout(self):
        """
        Enable dropout layers during model evaluation.
        """
        for module in self.modules():
            if isinstance(module, nn.Dropout):
                module.train()

# Instantiate the SimpleNN model
model = SimpleNN()

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_dataset = TensorDataset(train_X_tensor, train_y_tensor)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    for i, (images, labels) in enumerate(train_data_loader):
        # Reshape images if necessary
        images = images.view(-1, 28*28)

        # Move images and labels to the appropriate device (e.g., GPU)
        images = images
        labels = labels

        # Forward pass
        outputs = model(images)

        # Compute the loss
        loss = criterion(outputs, labels)

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print training progress
        if (i + 1) % 100 == 0:
            print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_data_loader)}], Loss: {loss.item():.4f}")


Epoch [1/10], Step [100/150], Loss: 0.3839
Epoch [2/10], Step [100/150], Loss: 0.2842
Epoch [3/10], Step [100/150], Loss: 0.2251
Epoch [4/10], Step [100/150], Loss: 0.1637
Epoch [5/10], Step [100/150], Loss: 0.1109
Epoch [6/10], Step [100/150], Loss: 0.1223
Epoch [7/10], Step [100/150], Loss: 0.0940
Epoch [8/10], Step [100/150], Loss: 0.1545
Epoch [9/10], Step [100/150], Loss: 0.1319
Epoch [10/10], Step [100/150], Loss: 0.2385


In [5]:

def enable_dropout(model):
    for module in model.modules():
        if isinstance(module, nn.Dropout):
            module.train()

In [6]:
import torch.nn.functional as F

def get_monte_carlo_predictions(data_loader, forward_passes, model, n_classes, n_samples):
    dropout_predictions = []
    softmax = nn.Softmax(dim=1)

    for i in range(forward_passes):
        predictions = []
        model.eval()  # Set the model to evaluation mode
        enable_dropout(model)

        for batch_images in data_loader:
            for image in batch_images:
                # Move each image to the appropriate device (GPU or CPU)
                image = image

                with torch.no_grad():
                    output = model(image)
                    output = softmax(output)

                predictions.append(output.cpu().numpy())

        dropout_predictions.append(np.array(predictions))

    return np.array(dropout_predictions)


In [7]:
def entropy(p):
    return -np.sum(p * np.log2(p + 1e-8), axis=1)

In [8]:

def active_learning_with_uncertainty(data_loader, forward_passes, model, n_classes, n_samples, alpha=0.5, num_samples_to_label=100):
    dropout_predictions = get_monte_carlo_predictions(data_loader, forward_passes, model, n_classes, n_samples)
    mean = np.mean(dropout_predictions, axis=0)
    entropies = entropy(mean)
    uncertainty_score = entropies
    sorted_indices = np.argsort(uncertainty_score)
    selected_indices = sorted_indices[-num_samples_to_label:]

    return selected_indices



In [9]:

num_iterations = 5
num_samples_to_label = 100
forward_passes = 10  

train_dataset = TensorDataset(train_X_tensor, train_y_tensor)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

for iteration in range(num_iterations):
    print(f"Active Learning Iteration {iteration + 1}/{num_iterations}")

    # Step 1: Select the most uncertain samples from the unlabeled dataset
    selected_indices = active_learning_with_uncertainty(
        data_loader=unlabeled_data_loader,
        forward_passes=forward_passes,
        model=model,
        n_classes=10,  # Replace with the actual number of classes in your dataset
        n_samples=num_samples_to_label
    )

    # Step 2: Label the selected samples and add them to the training dataset
    labeled_images = Pool_X_tensor[selected_indices]
    labeled_labels = Pool_y_tensor[selected_indices]

    # Reshape labeled_images to remove the extra dimension
    labeled_images = labeled_images.view(-1, labeled_images.size(-1))

    # Combine the labeled samples with the original training dataset
    train_X_tensor = torch.cat([train_X_tensor, labeled_images])
    train_y_tensor = torch.cat([train_y_tensor, labeled_labels])  # Concatenate the labels directly

    # Create a new DataLoader for the updated training dataset
    train_dataset = TensorDataset(train_X_tensor, train_y_tensor)
    train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # Step 3: Retrain the model on the updated training dataset
    for epoch in range(num_epochs):
        model.train()
        for i, (images, labels) in enumerate(train_data_loader):
            # Move images and labels to the appropriate device (GPU or CPU)
            images = images
            labels = labels

            # Forward pass
            outputs = model(images)

            # Compute the loss
            loss = criterion(outputs, labels)

            # Backpropagation and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Print training progress
            if (i + 1) % 100 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_data_loader)}], Loss: {loss.item():.4f}")


Active Learning Iteration 1/5


RuntimeError: Tensors must have same number of dimensions: got 1 and 2

In [None]:

# After the active learning iterations, your model should be trained on the updated dataset.
# You can now evaluate the model on the test dataset to measure its performance.
model.eval()
with torch.no_grad():
    test_X_tensor = test_X_tensor.to(device)  # Move the test data to the same device as the model
    test_y_tensor = test_y_tensor.to(device)  # Move the test data to the same device as the model
    outputs = model(test_X_tensor)
    _, predicted = torch.max(outputs.data, 1)
    accuracy = (predicted == test_y_tensor).sum().item() / len(test_y_tensor)
    print(f"Test Accuracy: {accuracy:.4f}")
