In [1]:
"""
h-divergence.ipynb

Created on Mar 30 2023

@author: Lukas

This notebook is meant as an introduction to discriminative active learning (DAL),
and contains implementations for computing the H-Divergence between datasets.
"""

'\ndal.ipynb\n\nCreated on Mar 30 2023\n\n@author: Lukas\n\nThis notebook is meant as an introduction to discriminative active learning (DAL),\nand contains implementations for computing the H-Divergence between datasets and \nfor running DAL.\n'

First, we recall the definition of the H-Divergence. See here (https://melissadell.atlassian.net/wiki/spaces/TCC/pages/2584412161) for more background information and links to domain adaptation.

*Definition (H-Divergence):* Let $X$ be a domain (dataset), and let $D_S$ and $D_T$ be two distributions over $X$ (source and target). Let $H$ be a hypothesis class over $X$ (set of possible classifiers). Then we define the $H$-Divergence between $D_S$ and $D_T$ as

$d_H(D_S, D_T) = \sup_{h \in H} \left| \mathbb{P}_{x \sim D_S} \left[ h(x) = 1 \right] - \mathbb{P}_{x \sim D_T} \left[ h(x) = 1 \right]\right|$

We are interested in the distributions of the labeled and unlabed datasets, denoted by $L$ and $U$, respectively, so the $H$-Divergence becomes

$d_H(D_S, D_T) = \sup_{h \in H} \left| \frac{1}{|L|} \sum_{x \in L} h(x) - \frac{1}{|U|} \sum_{x \in U }h(x) \right|$

where $h(x)$ denotes the probability which the model $h$ assigns to the event $x \in L$. In order to (approximately) attain the supremum, we train a binary MLP classifier with the above expression as the loss function, i.e. we want it to output very high $h(x)$ for $x \in L$ and very low $h(x)$ for $x \in U$. We follow the original DAL paper and apply the classifier $h$ to the embeddings $\phi(x)$, and not to the original data $x$ itself. Here, $\phi$ is the model we would ultimately like to train using active learning (e.g. BERT).

Note that, by definition, $d_H(D_S, D_T) \in [0, 1]$, where we want $d_H(D_S, D_T) \approx 0$, which would indicate that the model (on average) cannot distinguish between $L$ and $U$.

In [2]:
# import packages

import numpy as np

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

**Basic functionality for computing the H-Divergence between two datasets**

In [3]:
# compute the H-divergence between the labeled data and the unlabeled data

def compute_H_divergence(labeled, unlabeled, model):
    """
    A function that computes the H-divergence between the labeled and unlabeled data.

    Parameters
    ----------
    labeled: numpy array
        The labeled data.

    unlabeled: numpy array
        The unlabeled data.

    model: torch model
        The discriminative model.

    Returns
    -------
    H_divergence : float
        The H-divergence between the labeled and unlabeled data.
        Must be between 0 and 1.
    """
    # convert the data to torch tensors
    labeled = torch.from_numpy(labeled).float()
    unlabeled = torch.from_numpy(unlabeled).float()

    # for each element in the labeled data, compute the probabilities of the classes
    p_L = model(labeled)
    p_U = model(unlabeled)

    # sum the probabilities of class 0 for each element in the labeled data and divide by the number of elements
    p_L_0 = torch.sum(p_L[:, 0])
    p_L_0 /= labeled.shape[0]

    # sum the probabilities of class 0 for each element in the unlabeled data and divide by the number of elements
    p_U_0 = torch.sum(p_U[:, 0])
    p_U_0 /= unlabeled.shape[0]

    # compute the H-divergence as the absolute difference between p_U_0 and p_L_0
    H_divergence = torch.abs(p_U_0 - p_L_0)

    # convert the H-divergence to a numpy float
    return H_divergence.item()


# train a discriminative model on the labeled data and unlabeled data

def train_discriminative_model(labeled, unlabeled, input_shape):
    """
    A function that trains and returns a discriminative model on the labeled and unlabeled data.

    Parameters
    ----------
    labeled : numpy.ndarray
        The labeled data.

    unlabeled : numpy.ndarray
        The unlabeled data.

    input_shape : int
        The number of features in the dataset.

    Returns
    -------
    model : tf.keras.Sequential
        The trained discriminative model.
    """

    # create the binary dataset:
    y_L = np.zeros((labeled.shape[0], 1), dtype='int')
    y_U = np.ones((unlabeled.shape[0], 1), dtype='int')
    X_train = np.vstack((labeled, unlabeled))
    Y_train = np.vstack((y_L, y_U))
    X_train = torch.from_numpy(X_train).float()
    Y_train = torch.from_numpy(Y_train).squeeze()

    # build the model:
    model = get_discriminative_model(input_shape)

    # train the model using torch:
    batch_size = 100
    epochs = 10
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(epochs):
        for i in range(0, X_train.shape[0], batch_size):
            x = X_train[i:i + batch_size]
            y = Y_train[i:i + batch_size]
            optimizer.zero_grad()
            y_pred = model(x)
            loss = criterion(y_pred, y)
            loss.backward()
            optimizer.step()

    return model


# we use a 3-layer MLP as the discriminative model

def get_discriminative_model(input_shape):
    """
    The MLP model for discriminative active learning, without any regularization techniques.

    Parameters
    ----------
    input_shape : int
        The number of features in the dataset.

    Returns
    -------
    model : tf.keras.Sequential
        The MLP model.
    """
    width = input_shape
    model = torch.nn.Sequential(
        torch.nn.Linear(width, 100),
        torch.nn.ReLU(),
        torch.nn.Linear(100, 100),
        torch.nn.ReLU(),
        torch.nn.Linear(100, 2),
        torch.nn.Softmax(dim=1)
    )

    return model

**Example Implementation:** compute the H-Divergence between two randomly chosen subsets of CIFAR 10 (should be close to zero).

In [4]:
# set up functions for CIFAR 10 example

# load the data

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 4

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


# define the model

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


net = Net()


# define the loss function and the optimizer

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)


# train the network

for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

In [None]:
# get the latent representation of the data using the trained model

def get_latent_representation(model, X):
    """
    A function that computes the latent representation of the data using the trained model.

    Parameters
    ----------
    model: torch.nn.Sequential
        The trained model.

    X : numpy.ndarray
        The data.

    Returns
    -------
    latent_representation : numpy.ndarray
        The latent representation of the data.
    """
    X = torch.from_numpy(X).float()
    latent_representation = model(X)
    latent_representation = latent_representation.detach().numpy()

    return latent_representation

In [6]:
# compute H-Divergence between two random samples from the train set of size 500

X_train = trainset.data

# choose 2 x 500 random samples from the training set
X_train_1 = X_train[np.random.choice(X_train.shape[0], 500, replace=False)]
X_train_2 = X_train[np.random.choice(X_train.shape[0], 500, replace=False)]

# convert the samples so that they are in the right format for the model
X_train_1 = X_train_1.reshape(X_train_1.shape[0], 3, 32, 32)
X_train_2 = X_train_2.reshape(X_train_2.shape[0], 3, 32, 32)

# compute the latent representation of the samples

latent_representation_train_1 = get_latent_representation(net, X_train_1)
latent_representation_train_2 = get_latent_representation(net, X_train_2)





Epoch 1/10
605/605 - 3s - loss: 0.0846 - accuracy: 0.9917 - 3s/epoch - 5ms/step
Epoch 2/10
605/605 - 1s - loss: 0.0483 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 3/10
605/605 - 2s - loss: 0.0481 - accuracy: 0.9917 - 2s/epoch - 3ms/step
Epoch 4/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 5/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 6/10
605/605 - 1s - loss: 0.0479 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 7/10
605/605 - 1s - loss: 0.0480 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 8/10
605/605 - 1s - loss: 0.0481 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 9/10
605/605 - 1s - loss: 0.0480 - accuracy: 0.9917 - 1s/epoch - 2ms/step
Epoch 10/10
605/605 - 2s - loss: 0.0480 - accuracy: 0.9917 - 2s/epoch - 3ms/step
0.0005562470753987629


In [None]:
# define a discriminator model with input shape the size of the latent representation
# and train it on the latent representation of the samples

# get the dimension of the latent representation
input_shape = latent_representation_train_1.shape[1]

# train the discriminator model
discriminator = train_discriminative_model(latent_representation_train_1, latent_representation_train_2, input_shape)

# compute the H-Divergence between the latent representations of samples 1 and 2
H_divergence = compute_H_divergence(latent_representation_train_1, latent_representation_train_2, discriminator)

print(H_divergence)

**Interpretation of Results:** The H-Divergence in this case is essentially zero, which is what we expected as the samples were chosen randomly.