In [11]:
import os
import sys

sys.path.append(os.getcwd() + '/..')

from importance_driven_coverage import attributors, clusterers, coverage

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Model setup and training

For this example we are using a simple LeNet5 model

In [12]:
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms.v2 as T
from torch import nn
from torch.utils import data
from torch.utils.data import DataLoader


class LeNet5(nn.Module):
    def __init__(self) -> None:
        super().__init__()

        self.conv1 = nn.Conv2d(1, 6, 5, padding=2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(400, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = F.max_pool2d(F.relu(self.conv1(x)), 2, 2)
        x = F.max_pool2d(F.relu(self.conv2(x)), 2, 2)

        x = x.view(-1, 16*5*5)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LeNet5().to(device)

In [14]:
transforms = T.Compose([
    T.ToImage(),
    T.ToPureTensor(),
    T.ToDtype(torch.float32, scale=True)
])
train_dataset = torchvision.datasets.MNIST("data", download=True, transform=transforms, train=True)
train, validation = data.random_split(train_dataset, [0.9, 0.1])
train_loader = data.DataLoader(train, batch_size=128, num_workers=4)
val_loader = data.DataLoader(validation, batch_size=128, num_workers=4)

test_dataset = torchvision.datasets.MNIST("data", download=True, transform=transforms, train=True)
test_loader = data.DataLoader(test_dataset, batch_size=128, num_workers=4)

In [15]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adadelta(model.parameters(), lr=1.0, rho=0.95)

def train(dataloader, model, criterion, opt):
        size = len(dataloader.dataset)
        num_batches = len(dataloader)
        model.train()
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            pred = model(X)
            loss = criterion(pred, y)

            loss.backward()
            opt.step()
            opt.zero_grad()

            if batch % (num_batches//10) == 0:
                loss, current = loss.item(), (batch + 1) * len(X)
                print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")

def val(dataloader, model, criterion):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += criterion(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

epochs = 3
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_loader, model, criterion, optimizer)
    val(val_loader, model, criterion)
print("Done!")

Epoch 1
-------------------------------
loss: 2.308755 [  128/54000]


loss: 0.991615 [ 5504/54000]
loss: 0.282846 [10880/54000]
loss: 0.226823 [16256/54000]
loss: 0.194923 [21632/54000]
loss: 0.071484 [27008/54000]
loss: 0.194007 [32384/54000]
loss: 0.202465 [37760/54000]
loss: 0.060451 [43136/54000]
loss: 0.098882 [48512/54000]
loss: 0.037425 [53888/54000]
Test Error: 
 Accuracy: 97.3%, Avg loss: 0.092625 

Epoch 2
-------------------------------
loss: 0.121083 [  128/54000]
loss: 0.087854 [ 5504/54000]
loss: 0.059270 [10880/54000]
loss: 0.172158 [16256/54000]
loss: 0.121833 [21632/54000]
loss: 0.016013 [27008/54000]
loss: 0.079195 [32384/54000]
loss: 0.099693 [37760/54000]
loss: 0.019619 [43136/54000]
loss: 0.069155 [48512/54000]
loss: 0.009661 [53888/54000]
Test Error: 
 Accuracy: 97.9%, Avg loss: 0.077303 

Epoch 3
-------------------------------
loss: 0.100667 [  128/54000]
loss: 0.037389 [ 5504/54000]
loss: 0.050859 [10880/54000]
loss: 0.128416 [16256/54000]
loss: 0.115720 [21632/54000]
loss: 0.007821 [27008/54000]
loss: 0.061769 [32384/54000]
loss

# IDC Usage

We want to test the penultimate layer, for out model (LeNet5) this is `model.fc2`. But we want the activation values after the activation function, since the ReLU after fc2 is not available to select directly we select the next layer `model.fc3` and tell CaptumLRPAttributor and idc.calculate that we want to test the input to the selected layer. Usually it would be easier to design a model with this in mind, such that we are able to directly reference the layer we want to test.

In [16]:
layer = model.fc3
n = 6

attributor = attributors.CaptumLRPAttributor(attribute_kwargs={"attribute_to_layer_input": True})
clusterer = clusterers.KMeansClustererSimpleSilhouette()

idc = coverage.ImportanceDrivenCoverage(model, attributor, clusterer)

score, combs = idc.calculate(train_loader, test_loader, layer, n, layer_input=True,
                           attributions_path="attributions.pt",
                           centroids_path="centroids.pt")



We can now apply a transformation during the calculation to calculate the improvement in coverage from using this augmentation.

In [17]:
new_trans = transforms = T.Compose([
    T.RandAugment()
])
score_after, combs_after = idc.calculate(train_loader, test_loader, layer, n, transform=new_trans, layer_input=True,
                                                   attributions_path="attributions.pt",
                                                   centroids_path="centroids.pt")

The total_combs attribute is populated after a calculation, it contains the set of all possible combinations of important neurons.

In [18]:
len(idc.total_combs)

360

We can see from the scores that the the dataset with the augmentation covers more of the important neuron combinations.

In [19]:
print("baseline score: ", score)
print("score with augmentation: ", score_after)

baseline score:  0.3388888888888889
score with augmentation:  0.36944444444444446


Since IDC gives us the sets of combinations that are covered, we can combine them to calculate the coverage of the baseline dataset combined with the transformed dataset.

In [20]:
print("baseline score", score)
print("score with baseline+augmentation", len(combs | combs_after)/len(idc.total_combs))

baseline score 0.3388888888888889
score with baseline+augmentation 0.38055555555555554
