# Simple Pseudo-Labeling (SSL) with Convolutional Neural Networks

In [1]:
import torch
import numpy as np
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score

# Initialing compute device (use GPU if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
# Download the MNIST dataset using the PyTorch API
train_dataset = MNIST(root='./data', train=True, transform=transforms.Normalize((0.1307,), (0.3081,)), download=True)
test_dataset = MNIST(root='./data', train=False, transform=transforms.Normalize((0.1307,), (0.3081,)), download=True)

# Load the trainset as a separate array and divide it into labeled and unlabeled partitions
x_train, y_train = train_dataset.data, train_dataset.targets
x_test, y_test = test_dataset.data, test_dataset.targets

# Print out the dimensionality of the input images of both sets
print(x_train.shape, x_test.shape)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw

torch.Size([60000, 28, 28]) torch.Size([10000, 28, 28])


In [3]:
# To start, we need to pose the MNIST digit recognition problem as a semi-supervised learning problem.
# To do this we'll set a number of samples to use in the first training procedure. Based on this number
# we'll take a number of labels per class and then train a classifier on this sub-sample
num_train_samples = 1000
samples_per_class = int(num_train_samples/10)

# Based on the number of training samples, we can separate the data into two factions: unlabeled and labeled (both x and y)
# Keep in mind that all these operations are done on the training data!
# You should always treat your testing data as invisible and use it only for what the name suggests, i.e. testing.

# 1. Get number of unique classes
unique_classes = y_train.unique()

# 2. Randomly choose samples_per_class elements for each unique class (for bonus XP try to do this in one line)
# https://numpy.org/doc/stable/reference/random/generated/numpy.random.choice.html
# https://pytorch.org/docs/stable/generated/torch.where.html
# You should end up with an array of num_train_samples indices that reference the labeled training samples
subsample_idx = [np.random.choice(torch.where(y_train == mnist_class)[0], size=samples_per_class, replace=False) for mnist_class in unique_classes]
subsample_idx = np.array(subsample_idx).flatten()

# 3. Separate the training datasets into two subsets: labeled and unlabeled
# Hint: create a boolean mask to easily separate the subsets
unlabeled_mask = np.ones(y_train.shape[0], dtype=bool)
unlabeled_mask[subsample_idx] = False
x_labeled, x_unlabeled = x_train[subsample_idx, :], x_train[unlabeled_mask, :]
y_labeled, y_unlabeled = y_train[subsample_idx], y_train[unlabeled_mask]

# 4. Print out the shapes of the subsets (Keep in mind now we are working with tensors)
print(x_labeled.shape, y_labeled.shape, x_unlabeled.shape, y_unlabeled.shape)

torch.Size([1000, 28, 28]) torch.Size([1000]) torch.Size([59000, 28, 28]) torch.Size([59000])


In [4]:
# Create PyTorch Datasets and dataloader objects using the subsets
# https://pytorch.org/docs/stable/data.html#torch.utils.data.TensorDataset
# https://pytorch.org/docs/stable/data.html#data-loading-order-and-sampler
trainset = TensorDataset(x_train, y_train)
testset = TensorDataset(x_test, y_test)
labeled_dataset = TensorDataset(x_labeled,y_labeled)
unlabeled_dataset = TensorDataset(x_unlabeled,y_unlabeled)

train_dataloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False)
unlabeled_dataloader = DataLoader(unlabeled_dataset, batch_size=128, shuffle=False)
labeled_dataloader = DataLoader(labeled_dataset, batch_size=128, shuffle=True)

In [5]:
# Create a basic CNN classifier using PyTorch that can fit the MNIST data
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1)

        # Pooling (Max)
        self.maxpool = nn.MaxPool2d(kernel_size=3)

        # Fully connected layers
        self.fc1 = nn.Linear(32 * 6 * 6, 128)
        self.fc2 = nn.Linear(128, 10)

        # Dropout
        self.dropout2d = nn.Dropout2d(p=0.1)
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, x):
        # Convolution block 1
        out = F.relu(self.conv1(x))
  
        # Convolution block 2 
        out = F.relu(self.conv2(out))

        # Convolution block 3
        out = F.relu(self.conv3(out))
        out = self.maxpool(out)
        out = self.dropout2d(out)

        # Create dense vector representation
        # (Bs, 32, 6, 6) - > (Bs, 32*6*6)
        out = out.view(out.size(0), -1)

        # Linear (FC) layer [Here we would also need a softmax (multiclass classification), but we'll talk about that in a second]
        out = F.relu(self.fc1(out))
        out = self.dropout(out)
        out = self.fc2(out)

        return out

In [6]:
# Initialize objects needed for training
model = CNN().to(device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, momentum=0.9)

# Train CNN model on labeled dataset
num_epochs = 100
for i in range(num_epochs):
  model.train()
  predictions = []
  ground_truth = []
  for images, labels in labeled_dataloader:
        # Add dimension for Nr. channels (we don't have it here because of Grayscale) and transform the images tensor into a float
        images, labels = images.unsqueeze(1).float().to(device), labels.to(device)

        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()

        # Forward pass to get output
        outputs = model(images)

        # Calculate Loss: softmax --> cross entropy loss
        loss = loss_fn(outputs, labels)

        # Getting gradients w.r.t. parameters
        loss.backward()

        # Updating parameters
        optimizer.step()

        # Get predictions from the maximum value and append them to calculate accuracy later
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.detach().cpu().numpy().flatten().tolist())
        ground_truth.extend(labels.detach().cpu().numpy().flatten().tolist())
  
  if i%20 == 0:
    accuracy = accuracy_score(ground_truth, predictions)
    print('Epoch: {}. Loss: {}. Accuracy (on trainset/self): {}'.format(i, loss.item(), accuracy))

Epoch: 0. Loss: 2.578993082046509. Accuracy (on trainset/self): 0.122
Epoch: 20. Loss: 0.07852856069803238. Accuracy (on trainset/self): 0.973
Epoch: 40. Loss: 0.005684087984263897. Accuracy (on trainset/self): 0.993
Epoch: 60. Loss: 0.0054103052243590355. Accuracy (on trainset/self): 0.994
Epoch: 80. Loss: 0.010744250379502773. Accuracy (on trainset/self): 0.998


In [7]:
# Check where our CNN has made mistakes by using a confusion matrix
print(confusion_matrix(ground_truth, predictions))

[[100   0   0   0   0   0   0   0   0   0]
 [  0  99   0   0   0   0   0   0   1   0]
 [  0   0 100   0   0   0   0   0   0   0]
 [  0   0   0 100   0   0   0   0   0   0]
 [  0   0   0   0 100   0   0   0   0   0]
 [  0   1   0   0   0  99   0   0   0   0]
 [  0   0   0   0   0   0 100   0   0   0]
 [  0   1   0   0   0   0   0  99   0   0]
 [  0   0   0   0   0   0   1   0  99   0]
 [  0   0   0   0   0   0   0   0   0 100]]


In [8]:
# Predict the pseudo-labels of the unlabeled dataset
pseudo_labels = []
ground_truth = []
for images, labels in unlabeled_dataloader:
  model.eval()
  with torch.no_grad():
        # Add dimension for Nr. channels (we don't have it here because of Grayscale) and transform the images tensor into a float
        images, labels = images.unsqueeze(1).float().to(device), labels.to(device)
        
        # Forward pass to get output
        outputs = model(images)

        # Get predictions from the maximum value and append them to calculate accuracy later
        _, predicted = torch.max(outputs.data, 1)
        pseudo_labels.extend(predicted.detach().cpu().numpy().flatten().tolist())
        ground_truth.extend(labels.detach().cpu().numpy().flatten().tolist())

# Calculate the accuracy of the pseudo-labeling procedure (we can compute this since we actually have a fully labeled dataset)
accuracy = accuracy_score(ground_truth, pseudo_labels)
print('Accuracy of pseudo-labels: {}'.format(accuracy))

Accuracy of pseudo-labels: 0.9445423728813559


In [9]:
# Check where our CNN has mistaken the pseudo-labels by using a confusion matrix
print(confusion_matrix(y_unlabeled, pseudo_labels))

[[5711    1    0    3    2   22   15   14   45   10]
 [   4 6489    9   22   15   34    8   26   29    6]
 [  39   58 5157   57   66   12   10  369   74   16]
 [  18   12   69 5622    6  121    5   50   79   49]
 [  10    4   18    0 5571    4   27   36    4   68]
 [  31   11    1   56    5 5091   42    3   62   19]
 [  47   13    3    0   55  124 5541    9   25    1]
 [  18   30   38    9   24   16    0 5971   14   45]
 [  35   72   14  108   34  215   13   28 5181   51]
 [  32    5    4   73  166   27    2   85   61 5394]]


In [10]:
# Create a new training set using the new pseudo labels and the old supervised labels
# Print out the shapes (you should get the same ones of the original MNIST dataset)
# https://pytorch.org/docs/stable/generated/torch.vstack.html
new_x = torch.vstack((x_labeled, x_unlabeled))
new_y = torch.vstack((y_labeled.view(-1, 1), torch.LongTensor(pseudo_labels).view(-1, 1))).squeeze()
print(new_x.shape, new_y.shape)

# Build a new dataset using the above tensors 
pseudo_dataset = TensorDataset(new_x, new_y)
pseudo_dataloader = DataLoader(pseudo_dataset, batch_size=200, shuffle=True)

torch.Size([60000, 28, 28]) torch.Size([60000])


In [None]:
# Retrain the model on the pseudo-labeled dataset and test out the performance on the testing set (real labels)
new_model = CNN().to(device)
optimizer = torch.optim.SGD(new_model.parameters(), lr=1e-3, momentum=0.9)
num_epochs = 50
for i in range(num_epochs):
  new_model.train()
  for images, labels in pseudo_dataloader:
        # Add dimension for Nr. channels (we don't have it here because of Grayscale) and transform the images tensor into a float
        images, labels = images.unsqueeze(1).float().to(device), labels.to(device)

        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()

        # Forward pass to get output
        outputs = new_model(images)

        # Calculate Loss: softmax --> cross entropy loss
        loss = loss_fn(outputs, labels)

        # Getting gradients w.r.t. parameters
        loss.backward()

        # Updating parameters
        optimizer.step()
  
  # Evaluate directly on testset
  if i%10 == 0:
    predictions = []
    ground_truth = []
    for images, labels in test_dataloader:
      new_model.eval()
      with torch.no_grad():
        # Add dimension for Nr. channels (we don't have it here because of Grayscale) and transform the images tensor into a float
        images, labels = images.unsqueeze(1).float().to(device), labels.to(device)
        
        # Forward pass to get output
        outputs = new_model(images)

        # Get predictions from the maximum value and append them to calculate accuracy later
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.detach().cpu().numpy().flatten().tolist())
        ground_truth.extend(labels.detach().cpu().numpy().flatten().tolist())

    accuracy = accuracy_score(ground_truth, predictions)
    print('Epoch: {}. Loss: {}. Accuracy (on testing set): {}'.format(i, loss.item(), accuracy))

Epoch: 0. Loss: 0.14614535868167877. Accuracy (on testing set): 0.9545
Epoch: 10. Loss: 0.026030300185084343. Accuracy (on testing set): 0.9562
Epoch: 20. Loss: 0.05954669415950775. Accuracy (on testing set): 0.9564


# Now let's check out a super important practical application of deep NNs: https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html