In [2]:
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

import math

In [3]:
class LIFNeuron(nn.Module):
    def __init__(self, threshold, decay):
        super().__init__()
        self.threshold = threshold
        self.decay = decay
        self.voltage = 0

    def forward(self, x):
        spike = th.zeros_like(x)
        self.voltage = self.voltage * self.decay * (1. - spike) + x
        spike[self.voltage > self.threshold] = 1.
        self.voltage[self.voltage > self.threshold] = 0

        return spike

In [4]:
class SpikeSelfAttention(nn.Module):
    def __init__(self, d_model, nhead, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.nhead = nhead
        self.dropout = dropout

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)

        self.neuron = LIFNeuron(threshold=1.0, decay=0.9)

    def forward(self, x):
        q = self.neuron(self.query(x))
        k = self.neuron(self.key(x))
        v = self.neuron(self.value(x))

        attn_output_weights = F.softmax(
            q @ k.transpose(-2, -1) / math.sqrt(self.d_model), dim=-1)
        attn_output = attn_output_weights @ v
        attn_output = self.neuron(attn_output)

        return attn_output

In [5]:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=3, patch_size=16, emb_size=768):
        super().__init__()
        self.patch_size = patch_size
        self.projection = nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.projection(x)
        return x.flatten(2).transpose(1, 2)

In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = th.zeros(max_len, emb_size)
        position = th.arange(0, max_len, dtype=th.float).unsqueeze(1)
        div_term = th.exp(th.arange(0, emb_size, 2).float() * - (math.log(10000.0) / emb_size))
        pe[:, 0::2] = th.sin(position * div_term)
        pe[:, 1::2] = th.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [7]:
class SpikingTransformerEncoderLayer(nn.TransformerEncoderLayer):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__(d_model, nhead, dim_feedforward, dropout)
        self.self_attn = SpikeSelfAttention(d_model, nhead, dropout)

    def forward(self, src, src_key_padding_mask=None):
        src2 = self.self_attn(src)
        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src
        
class SpikingTransformerEncoder(nn.TransformerEncoder):
    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__(encoder_layer, num_layers, norm)

    def forward(self, src, src_key_padding_mask=None):
        """Pass the input through the encoder layers in turn."""
        output = src

        for mod in self.layers:
            output = mod(output, src_key_padding_mask=src_key_padding_mask)

        if self.norm is not None:
            output = self.norm(output)

        return output

In [17]:
from torch.utils.checkpoint import checkpoint

class SpikingViT(nn.Module):
    def __init__(self, in_channels, patch_size, emb_size, img_size, num_layers, num_heads, num_classes):
        super().__init__()
        self.patch_embedding = PatchEmbedding(in_channels, patch_size, emb_size)
        self.pos_embedding = PositionalEncoding(emb_size, max_len=(img_size//patch_size)**2)
        self.transformer_encoder_layers = nn.ModuleList([SpikingTransformerEncoderLayer(emb_size, num_heads) for _ in range(num_layers)])
        self.classifier = nn.Linear(emb_size, num_classes)

    def forward(self, x):
        x = self.patch_embedding(x)
        x = self.pos_embedding(x)
        for layer in self.transformer_encoder_layers:
            x = checkpoint(layer, x)
        x = x.mean(dim=1)
        x = self.classifier(x)
        return x


In [18]:
transform = transforms.Compose(
    [transforms.Resize((32, 32)),
     transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./Data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=32, shuffle=True, num_workers=1)

testset = torchvision.datasets.CIFAR10(root='./Data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=32, shuffle=False, num_workers=1)

device = th.device("cuda:1" if th.cuda.is_available() else "cpu")

Files already downloaded and verified
Files already downloaded and verified


In [19]:
import torch

print("Available GPUs: ", torch.cuda.device_count())

Available GPUs:  2


In [20]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"

In [21]:
from torch.cuda.amp import autocast, GradScaler

device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model = SpikingViT(3, 16, 768, 32, 6, 8, 10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
scaler = GradScaler()

for epoch in range(10):  # loop over the dataset multiple times
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0

print('Finished Training')


RuntimeError: The size of tensor a (32) must match the size of tensor b (16) at non-singleton dimension 0

In [None]:
correct = 