In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchvision.datasets import CIFAR10, CIFAR100
from torch.utils.data import DataLoader
from torchvision import transforms

import torch.functional as F
import os

# Data-preprocessing

In [15]:
#dataset = factbook_data(X, y, scale_data=False)
#trainloader = torch.utils.data.DataLoader(dataset, batch_size=10, shuffle=True, num_workers=1)
#testloader = torch.utils.data.DataLoader(dataset, batch_size=10, shuffle=True, num_workers=1)

# MLP layer

In [29]:

# Hyperparameters
cifar_10 = True
cifar_100 = not cifar_10

# The images in CIFAR-10 are of size 3x32x32, i.e. 3-channel color images of 32x32 pixels in size.
input_size = 3*32*32  # Number of input features
# Number of neurons in the hidden layer
hidden_size = 64  

if cifar_10:
    output_size = 10  # Number of output neurons (assuming a regression problem)
elif cifar_100:
    output_size = 100

num_epochs = 5
batch_size = 25
learning_rate = 0.01


In [35]:
# Create the neural network
class MLP(nn.Module):
    def __init__(self, input_size, output_size, \
                 num_layers=2, layer_width=64, \
                 activation=nn.ReLU()):
        #super(Net, self).__init__()
        super(MLP, self).__init__()
        # The last layer must have as many outputs as classes in the dataset!
        # Each target - cat, truck, airplane, etc - goes into its separate output!
        # The images in CIFAR-10 are of size 3x32x32, i.e. 3-channel color images of 32x32 pixels in size.
        # input dims: batch_size * channels (RGB=3) * width * depth
        # hidden states must match input dims: channels (RGB=3) * width * depth -> some hidden states (64)
        # 64 -> output size - 100 classes => output_size = 100
        # self.layers = nn.Sequential(nn.Flatten(),
        #                             nn.Linear(input_size, layer_width),
        #                             nn.ReLU(),
        #                             nn.Linear(layer_width, output_size)
        #                             )
        self.first = nn.Flatten()

        self.layers = nn.ModuleList()
        #self.linear = nn.Linear(input_size, layer_width)
        self.layers.append(nn.Linear(input_size, layer_width))
        for _ in range(1,num_layers):
            self.layers.append(nn.Linear(layer_width,layer_width))
        self.activation = activation

        self.output = nn.Linear(layer_width,output_size)


    def forward(self, x):
        
        x = self.first(x)                   # DIMS: batch, input_size
        print('first size: ', x.size())
        for layer in self.layers[:-1]:
            x = self.activation(layer(x))   # DIMS: batch, layer_width
        #x = self.activation(self.linear(x))
        print('linear size: ',x.size())
        x = self.output(x)                  # DIMS: batch, output_size
        print('output size: ',x.size())
        return x 


In [33]:
# Set fixed random number seed
torch.manual_seed(42)

# Transform to tensor and normalize
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Prepare CIFAR-10 dataset
# Training set
if cifar_10:
    trainset = CIFAR10(os.getcwd(),
                        download=True, 
                        transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                            shuffle=True, num_workers=1)

    # Testing set
    testset = CIFAR10(root='./',
                    train=False,
                    download=True,
                    transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
    
    classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
elif cifar_100:
    trainset = CIFAR100(os.getcwd(),
                        download=True, 
                        transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                            shuffle=True, num_workers=1)

    # Testing set
    testset = CIFAR100(root='./',
                    train=False,
                    download=True,
                    transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
    

Files already downloaded and verified
Files already downloaded and verified


In [36]:
# Initialize the neural network instance
net = MLP(input_size, output_size)
print(net)
# Loss function and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    # Print epoch
    print(f'Starting epoch {epoch+1}')

    # Set current loss value
    loss_per_epoch = 0.0
    
    # Iterate over the DataLoader for training data
    for i, data in enumerate(trainloader, 0):

        # Get inputs
        inputs, targets = data

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = net(inputs)

        # Calculate loss
        loss = loss_function(outputs, targets)

        # Backward pass
        loss.backward()

        # Optimization
        optimizer.step()

        # Print progress
        loss_per_epoch += loss.item()
        if i % 500 == 499:
                    print('Loss after mini-batch %5d: %.3f' %
                            (i + 1, loss_per_epoch / 500))
                    # Reset loss after mini-batch
                    loss_per_epoch = 0.0


# Process is complete.
print('Training process has finished.')


MLP(
  (first): Flatten(start_dim=1, end_dim=-1)
  (layers): ModuleList(
    (0): Linear(in_features=3072, out_features=64, bias=True)
    (1): Linear(in_features=64, out_features=64, bias=True)
  )
  (activation): ReLU()
  (output): Linear(in_features=64, out_features=10, bias=True)
)
Starting epoch 1
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size([25, 64])
output size:  torch.Size([25, 10])
first size:  torch.Size([25, 3072])
linear size:  torch.Size

In [37]:
# Make predictions
with torch.no_grad():
    new_input = torch.randn(1, 3, 32, 32)
    predicted_output = net(new_input)
    print("Predicted output:", predicted_output)

first size:  torch.Size([1, 3072])
linear size:  torch.Size([1, 64])
output size:  torch.Size([1, 10])
Predicted output: tensor([[-0.2455, -0.5381,  1.0658,  0.7840,  0.7311,  1.9463,  1.6289,  0.9477,
         -0.3077, -0.0538]])


# Attention layer

In [38]:
class SelfAttention(nn.Module):
    def __init__(self, input_dim):
        super(SelfAttention, self).__init__()
        self.query = nn.Linear(input_dim, input_dim, bias=False)        # CHECK [batch_size, seq_length, input_dim]
        self.key = nn.Linear(input_dim, input_dim, bias=False)          # CHECK [batch_size, seq_length, input_dim]
        self.value = nn.Linear(input_dim, input_dim, bias=False)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x): # x.shape CHECK (batch_size, seq_length, input_dim)
        query = self.query(x)
        key = self.key(x)
        value = self.value(x)

        attention_scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(key.shape[-1]))
        attention_weights = self.softmax(attention_scores)

        output = torch.matmul(attention_weights, value)
        return output


class SelfAttentionNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SelfAttentionNet, self).__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)

        self.attention = SelfAttention(hidden_dim)
        
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.embedding(x)
        x = self.attention(x)
        x = self.fc(x)
        
        return x

In [39]:
# Create an instance of the SelfAttentionNet
model = SelfAttentionNet(input_dim=10, hidden_dim=32, output_dim=5)

# Input data
input_data = torch.randn(16, 10)

# Forward pass
output = model(input_data)

# Freeze MLP parameters

In [None]:
# Freeze all the layers of the pre-trained model
for param in model.parameters():
    param.requires_grad = False

# Freezing only specific layers in the network :
for name, param in model.named_parameters():
    if 'conv1' in name or 'layer1' in name:
        param.requires_grad = False

In [None]:
class Attention_MLP(nn.Module):
    def __init__(self, layer_widths, hidden_dim, output_dim):
        super(Attention_MLP, self).__init__()

        # self.embedding = nn.Linear(layer_widths, hidden_dim)
        self.attention = SelfAttention(layer_widths, )
        
        self.MLP = MLP(input_size=1, output_size=2, num_layers=1, layer_width=2)

        # self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):

        x = self.embedding(x)
        # x for attention is 2D at each layer: 
        # (layers_width layer i, layers_width layer i+1)
        x = self.attention(x)
        x = self.fc(x)

        return x

In [5]:
class FixedWeightMLPAttention(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size, fixed_layers, attention_dim):
        super(FixedWeightMLPAttention, self).__init__()

        self.fixed_layers = fixed_layers
        self.attention_dim = attention_dim

        # Create MLP layers with fixed weights
        self.mlp = nn.Sequential(
            nn.Linear(input_size, hidden_sizes[0]),
            nn.ReLU(),
            *[nn.Sequential(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]), nn.ReLU()) for i in range(fixed_layers-1)]
        )

        Let's select entries to the matrix as the top-k connections!!!! Maybe even top 1?????

        # Create attention mechanism
        self.wq = nn.Linear(hidden_sizes[0], attention_dim)  # Assuming the input is the first layer's output
        self.wk = nn.Linear(hidden_sizes[0], attention_dim)
        self.wv = nn.Linear(attention_dim, hidden_sizes[-1])

        # Create final layer with dynamic weights
        self.final_layer = nn.Linear(hidden_sizes[-1], output_size)

    def forward(self, x):
        # Pass input through fixed MLP layers, storing intermediate outputs
        intermediate_outputs = [x]
        for layer in self.mlp:
            x = layer(x)
            intermediate_outputs.append(x)

        # Create matrices for each layer
        matrices = [torch.stack([x.unsqueeze(0) for x in intermediate_outputs[i:]], dim=0)
                    for i in range(len(intermediate_outputs))]

        # Calculate query, key, and value vectors from the matrices
        q = self.wq(matrices[0])
        k = self.wk(matrices[0])
        v = matrices[0]

        # Compute scaled dot product attention
        attn = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(attention_dim, dtype=torch.float))
        attn = torch.softmax(attn, dim=-1)

        # Apply attention to value vectors
        x = torch.matmul(attn, v)

        # Pass through final layer
        x = self.final_layer(x)

        return x

In [6]:
# Create model instance
model = FixedWeightMLPAttention(input_size=10, hidden_sizes=[20, 20, 20], output_size=5, fixed_layers=2, attention_dim=20)

# Input data
input_data = torch.randn(32, 10)

# Forward pass
output = model(input_data)

RuntimeError: stack expects each tensor to be equal size, but got [1, 32, 10] at entry 0 and [1, 32, 20] at entry 1

# Network visualization

https://www.appsilon.com/post/visualize-pytorch-neural-networks

### tensorboard

In [109]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter("torchlogs/")
model = Net()
writer.add_graph(model, inputs)
writer.close()

In [16]:
model = SelfAttentionNet(input_dim=10, hidden_dim=32, output_dim=5)
input_data = torch.randn(16, 10)

writer = SummaryWriter("torchlogs_attention/")
writer.add_graph(model, input_data)
writer.close()

  attention_scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(key.shape[-1]))
  attention_scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(key.shape[-1]))


### netron

In [20]:
input_names = ["input"]
output_names = ["output"]

model = SelfAttentionNet(input_dim=10, hidden_dim=32, output_dim=5)
input_data = torch.randn(16, 10)

torch.onnx.export(model, input_data, "model_attention.onnx", input_names=input_names, output_names=output_names)

  attention_scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(key.shape[-1]))
  attention_scores = torch.matmul(query, key.transpose(-2, -1)) / torch.sqrt(torch.tensor(key.shape[-1]))


In [110]:
input_names = ["cifar_100"]
output_names = ["output"]

torch.onnx.export(net, inputs, "mlp_cifar_100.onnx", input_names=input_names, output_names=output_names)