In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install captum

Collecting captum
  Downloading captum-0.6.0-py3-none-any.whl (1.3 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.3 MB[0m [31m6.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m21.9 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: captum
Successfully installed captum-0.6.0


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from captum.attr import LayerIntegratedGradients
from captum.attr import LayerConductance
from captum.attr import LayerDeepLift
import pickle
import os
import csv

In [None]:
method_names = ["LayerConductance", "LayerIntegratedGradients", "LayerDeepLift"]
train_size=30000
test_size=1000

# Model structure defination

In [None]:
class NaiveInception(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(NaiveInception, self).__init__()

        self.conv1x1 = nn.Conv2d(in_channels, 16, kernel_size=1)
        self.conv1x1_3x3 = nn.Sequential(
            nn.Conv2d(in_channels, 8, kernel_size=1),
            nn.Conv2d(8, 16, kernel_size=3, padding=1)
        )
        self.conv1x1_5x5 = nn.Sequential(
            nn.Conv2d(in_channels, 8, kernel_size=1),
            nn.Conv2d(8, 16, kernel_size=5, padding=2)
        )
        self.pool = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)

        self.fc = nn.Sequential(
            nn.Linear(38416, 256),
            nn.ReLU(),
            nn.Linear(256, num_classes)  # Number of output classes
        )

    def forward(self, x):
        x1 = self.conv1x1(x)
        x2 = self.conv1x1_3x3(x)
        x3 = self.conv1x1_5x5(x)
        x4 = self.pool(x)
        x = torch.cat((x1, x2, x3, x4), dim=1)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

# FLOP Count

get size of the input data

In [None]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_subset = datasets.MNIST('.', train=True, download=True, transform=transform)
test_subset = datasets.MNIST('.', train=False, download=True, transform=transform)

# Define the size of the random subsets
train_indices = torch.randperm(len(train_subset))[:train_size]
test_indices = torch.randperm(len(test_subset))[:test_size]

train_loader = torch.utils.data.DataLoader(train_subset, batch_size=64, sampler=torch.utils.data.SubsetRandomSampler(train_indices))
test_loader = torch.utils.data.DataLoader(test_subset, batch_size=16, sampler=torch.utils.data.SubsetRandomSampler(test_indices))
for input_data in train_loader:
  print(input_data[0].size())
  break

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 118474972.48it/s]

Extracting ./MNIST/raw/train-images-idx3-ubyte.gz to ./MNIST/raw






Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 34511593.68it/s]


Extracting ./MNIST/raw/train-labels-idx1-ubyte.gz to ./MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 188346398.23it/s]


Extracting ./MNIST/raw/t10k-images-idx3-ubyte.gz to ./MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 3544284.42it/s]

Extracting ./MNIST/raw/t10k-labels-idx1-ubyte.gz to ./MNIST/raw






torch.Size([64, 1, 28, 28])


calculate flops for each layer

In [None]:
def count_flops(model, input_tensor):
    flops = {}  # Use a dictionary to store FLOPs for each layer
    hooks = []  # We'll use hooks to count FLOPs for each layer

    def hook_fn(module, input, output):
        if isinstance(module, nn.Conv2d):
            # Calculate FLOPs for Conv2d layer
            input_shape = input[0].shape
            kernel_shape = module.weight.shape
            flops[module] = int(input_shape[1] * kernel_shape[2] * kernel_shape[3] * kernel_shape[0] * input_shape[2] * input_shape[3] / module.groups)

    def register_hooks(module):
        # Register hooks for all Conv2d layers
        for layer in module.children():
            if isinstance(layer, nn.Conv2d):
                hook = layer.register_forward_hook(hook_fn)
                hooks.append(hook)
            elif isinstance(layer, nn.Sequential):
                register_hooks(layer)

    # Register hooks for all layers in the model
    register_hooks(model)

    # Run input through the model
    with torch.no_grad():
        model(input_tensor)

    # Remove the hooks
    for hook in hooks:
        hook.remove()

    return flops

# Example usage:
input_tensor = torch.randn(1, 1, 28, 28)  # Example input tensor with shape (batch_size, channels, height, width)
model = NaiveInception(in_channels=1, num_classes=10)  # Create an instance of the NaiveInception model
flops = count_flops(model, input_tensor)

# Print FLOPs for each layer
for layer, flop_count in flops.items():
    print(f"{layer.__class__.__name__}: {flop_count} FLOPs")


Conv2d: 12544 FLOPs
Conv2d: 6272 FLOPs
Conv2d: 903168 FLOPs
Conv2d: 6272 FLOPs
Conv2d: 2508800 FLOPs


# Train all models and record attributions

get model

In [None]:
# Create and train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Create an instance of the Inception-like 3-stack model
in_channels = 1  # Input channels (e.g., for RGB images)
num_classes = 10  # Number of classes in your classification task
model = NaiveInception(in_channels, num_classes).to(device)

get train process

In [None]:
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])

train_subset = datasets.MNIST('.', train=True, download=True, transform=transform)
test_subset = datasets.MNIST('.', train=False, download=True, transform=transform)

# Define the size of the random subsets
train_indices = torch.randperm(len(train_subset))[:train_size]
test_indices = torch.randperm(len(test_subset))[:test_size]

train_loader = torch.utils.data.DataLoader(train_subset, batch_size=64, sampler=torch.utils.data.SubsetRandomSampler(train_indices))
test_loader = torch.utils.data.DataLoader(test_subset, batch_size=16, sampler=torch.utils.data.SubsetRandomSampler(test_indices))

# Create and train the model on the GPU
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()


train and eval function

In [None]:
def train(epoch):
    model.train()
    train_loss = 0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

    train_accuracy = 100. * correct / total
    print(f"Epoch {epoch}: Train Loss = {train_loss / len(train_loader):.4f}, Train Accuracy = {train_accuracy:.2f}%")


def test(epoch):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            test_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

    test_accuracy = 100. * correct / total
    print(f"Epoch {epoch}: Test Loss = {test_loss / len(test_loader):.4f}, Test Accuracy = {test_accuracy:.2f}%")

functions for calculate attribution

In [None]:
def calculate_attribution_one_layer(model, layer, input_data, target_class, method):
    methods = {"LayerConductance": LayerConductance, "LayerIntegratedGradients": LayerIntegratedGradients, "LayerDeepLift": LayerDeepLift}
    using_function = methods[method](model, layer)
    attribution = using_function.attribute(input_data, target=target_class)

    return attribution

In [None]:
def get_all_attributions_one_input(model, input_data, target_class, method):
    all_attributions = {}
    for name, layer in model.named_children():
      print(name)
      all_attributions[layer] = []
      layer_attributions=calculate_attribution_one_layer(model, layer, input_data, target_class, method)
      all_attributions[layer]=layer_attributions
    return all_attributions

In [None]:
def calculate_and_save_attributions(model, model_index, sample_input, target_class, method_names):
    # Ensure sample_input has gradients
    sample_input.requires_grad_()

    # Set device for model and input tensor
    device = torch.device("cpu")
    sample_input = sample_input.to(device)
    model.to(device)


    # Calculate and print importance scores for each layer and neuron
    for method in method_names:
        print(method)
        attributions = get_all_attributions_one_input(model, sample_input, target_class, method=method)
        attribution_sum = 0

        for i in attributions:
            attribution_sum = attribution_sum + attributions[i].sum().item()

        for i in attributions:
            a = attributions[i].sum().item() / attribution_sum
            formatted_number = f"{a:.2f}"
            print(formatted_number)
            print(attributions[i].shape)
            print()

        print("\n\n")


train, save, and recored attribution info of models

In [None]:
for i in range(5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    in_channels = 1  # Input channels (e.g., for RGB images)
    num_classes = 10  # Number of classes in your classification task

    model = NaiveInception(in_channels, num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(3):
      train(epoch)
      test(epoch)
    model_index="test"+str(i)
    sample_input, target_class=None, None
    for batch_idx, (data, target) in enumerate(test_loader):
      sample_input, target_class = data, target
      break
    calculate_and_save_attributions(model, model_index, sample_input, target_class, method_names)

Epoch 0: Train Loss = 4.6206, Train Accuracy = 52.63%
Epoch 0: Test Loss = 1.0425, Test Accuracy = 66.50%
Epoch 1: Train Loss = 0.7652, Train Accuracy = 75.08%
Epoch 1: Test Loss = 0.6453, Test Accuracy = 82.60%
Epoch 2: Train Loss = 0.4758, Train Accuracy = 86.83%
Epoch 2: Test Loss = 0.4790, Test Accuracy = 88.00%
LayerConductance
conv1x1
conv1x1_3x3
conv1x1_5x5
pool
fc
0.19
torch.Size([16, 16, 28, 28])

0.06
torch.Size([16, 16, 28, 28])

0.18
torch.Size([16, 16, 28, 28])

0.06
torch.Size([16, 1, 28, 28])

0.51
torch.Size([16, 10])




LayerIntegratedGradients
conv1x1
conv1x1_3x3
conv1x1_5x5
pool
fc
0.21
torch.Size([16, 16, 28, 28])

0.10
torch.Size([16, 16, 28, 28])

0.24
torch.Size([16, 16, 28, 28])

0.09
torch.Size([16, 1, 28, 28])

0.35
torch.Size([16, 10])




LayerDeepLift
conv1x1
conv1x1_3x3
conv1x1_5x5
pool
fc
0.20
torch.Size([16, 16, 28, 28])

0.06
torch.Size([16, 16, 28, 28])

0.18
torch.Size([16, 16, 28, 28])

0.06
torch.Size([16, 1, 28, 28])

0.50
torch.Size([16, 10])



