In [3]:
import torch 
from torchvision.datasets import OxfordIIITPet
import matplotlib.pyplot as plt 
from random import random 




  from .autonotebook import tqdm as notebook_tqdm


In [4]:
import torch 
from torch import nn
from einops.layers.torch import Rearrange
from torch import Tensor
from torchvision.transforms import Resize, ToTensor
from torchvision.transforms.functional import to_pil_image

In [5]:
to_tensor = [Resize((144, 144)), ToTensor()]

class Compose(object):
    def __init__(self, transforms): 
        self.transforms = transforms 
        
    def __call__(self, image, target): 
        for t in self.transforms: 
            image = t(image)
        return image, target 


dataset = OxfordIIITPet(root="./", download=True, transforms=Compose(to_tensor))

def show_images(images, num_samples=40, cols=8): 
    plt.figure(figsize=(15,15))
    idx = int(len(dataset)/num_samples)
    for i, img in enumerate(images):
        if i % idx == 0:
            plt.subplot(int(num_samples/cols) + 1, cols, int(i/idx) + 1)
            plt.imshow(to_pil_image(img[0]))
    
# show_images(dataset)

In [6]:
class PatchEmbedding(nn.Module): 
    def __init__(self, in_channels=3, patch_size =8, emb_size=128): 
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = patch_size, p2 =patch_size),
            nn.Linear(patch_size * patch_size * in_channels, emb_size) 
        )
    
    def forward(self, x: Tensor) -> Tensor: 
        x = self.projection(x)
        return x



sample_datapoint = torch.unsqueeze(dataset[0][0], 0)
print (f'initial shape : {sample_datapoint.shape}')
embedding = PatchEmbedding()(sample_datapoint)
print (f'Patches shape : {embedding.shape}')


initial shape : torch.Size([1, 3, 144, 144])
Patches shape : torch.Size([1, 324, 128])


In [7]:
from einops import rearrange

class Attention(nn.Module): 
    def __init__(self, dim, n_heads, dropout): 
        super().__init__()
        self.n_heads = n_heads 
        self.attn = torch.nn.MultiheadAttention(embed_dim=dim, 
                                                num_heads=n_heads,
                                                dropout=dropout)
        self.q = torch.nn.Linear(dim, dim)
        self.k = torch.nn.Linear(dim, dim)
        self.v = torch.nn.Linear(dim, dim)
        
    def forward(self, x): 
        q = self.q(x)
        k = self.k(x)
        v = self.v(x)
        attn_output, attn_output_weights = self.attn(x,x,x)
        return attn_output 
    
attn = Attention(128, 4, 0)(torch.ones((1,5,128)))
attn.shape
        

torch.Size([1, 5, 128])

In [9]:
class PreNorm(nn.Module): 
    def __init__(self, dim, fn): 
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn 
    
    def forward(self, x, **kwargs): 
        return self.fn(self.norm(x), **kwargs)

norm = PreNorm(128, Attention(dim=128, n_heads=4, dropout=0.))
norm(torch.ones((1,5,128))).shape

torch.Size([1, 5, 128])

In [10]:
class FeedForward(nn.Sequential): 
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__(
            nn.Linear(dim, hidden_dim), 
            nn.GELU(), 
            nn.Dropout(dropout), 
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

ff = FeedForward(dim=128, hidden_dim=256)
ff(torch.ones((1,5,128))).shape
        
    

torch.Size([1, 5, 128])

In [11]:
class ResidualAdd(nn.Module): 
    def __init__(self, fn): 
        super().__init__()
        self.fn = fn 
    
    def forward(self, x, **kwargs): 
        res = x 
        x = self.fn(x, **kwargs)
        x += res 
        return x 

residual_attn = ResidualAdd(Attention(dim=128, n_heads=4, dropout=0.))
residual_attn(torch.ones((1,5,128))).shape


        

torch.Size([1, 5, 128])

In [15]:
from einops import repeat

class ViT(nn.Module): 
    def __init__(self, ch=3, img_size=144, patch_size=4, emb_dim=32,
                 n_layers=6, out_dim=37, dropout=0.1, heads=2): 
        super(ViT, self).__init__()
        
        # Attributes 
        self.channels = ch
        self.height = img_size 
        self.width = img_size
        self.patch_size = patch_size
        self.n_layers = n_layers 
        
        # patching 
        self.patch_embedding = PatchEmbedding(in_channels=ch,
                                              patch_size=patch_size, 
                                              emb_size=emb_dim)

        # learnabale parameters 
        num_patches = (img_size // patch_size) ** 2
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches+1, emb_dim))
        self.cls_token= nn.Parameter(torch.rand(1,1, emb_dim))
        
        # Transformer Encoder 
        self.layers = nn.ModuleList([])
        
        for _ in range(n_layers):
            transformer_block = nn.Sequential(
                ResidualAdd(PreNorm(emb_dim, Attention(emb_dim, n_heads=heads, dropout=dropout))),
                            ResidualAdd(PreNorm(emb_dim, FeedForward(emb_dim, emb_dim, dropout=dropout))))
            self.layers.append(transformer_block)
            
        # classification head 
        self.head = nn.Sequential(nn.LayerNorm(emb_dim), nn.Linear(emb_dim, emb_dim))
    

    def forward(self, img): 
        # get patch embedding vectors 
        x = self.patch_embedding(img)
        b,n,_ = x.shape 
        
        # add cls token to inputs 
        cls_tokens = repeat(self.cls_token, '1 1 d -> b 1 d', b=b)
        x = torch.cat([cls_tokens, x], dim=1)
        x += self.pos_embedding[:, :(n+1)]
        
        # Transformer layers 
        for i in range(self.n_layers): 
            x = self.layers[i](x)
        
        # output based on classifciation token
        return self.head(x[:, 0, :])
            
model = ViT()
print (model)
model(torch.ones((1,3,144,144)))

            
        

ViT(
  (patch_embedding): PatchEmbedding(
    (projection): Sequential(
      (0): Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=4, p2=4)
      (1): Linear(in_features=48, out_features=32, bias=True)
    )
  )
  (layers): ModuleList(
    (0): Sequential(
      (0): ResidualAdd(
        (fn): PreNorm(
          (norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
          (fn): Attention(
            (attn): MultiheadAttention(
              (out_proj): NonDynamicallyQuantizableLinear(in_features=32, out_features=32, bias=True)
            )
            (q): Linear(in_features=32, out_features=32, bias=True)
            (k): Linear(in_features=32, out_features=32, bias=True)
            (v): Linear(in_features=32, out_features=32, bias=True)
          )
        )
      )
      (1): ResidualAdd(
        (fn): PreNorm(
          (norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
          (fn): FeedForward(
            (0): Linear(in_features=32, out_features=3

tensor([[ 0.6879, -0.4708, -0.3587, -0.7444, -0.9594,  0.2227, -0.4656,  0.2082,
         -0.4802, -1.4774, -0.7473, -0.1310,  1.0424, -0.5546,  0.3352, -0.0805,
          0.2552,  2.2155, -1.5980, -0.1756,  0.3760,  0.6216,  0.1511, -0.8476,
          0.3163, -0.2990, -0.5371,  0.1352, -0.1488,  0.1135,  0.0959, -0.1505]],
       grad_fn=<AddmmBackward0>)

In [16]:
from torch.utils.data import DataLoader
from torch.utils.data import random_split

train_split = int (0.8* len(dataset))
train, test = random_split(dataset, [train_split, len(dataset)-train_split])

In [21]:
train_loader = DataLoader(train, batch_size=32, shuffle=True)
test_loader = DataLoader(test, batch_size=32, shuffle=True)




In [26]:
import torch.optim as optim 
import numpy as np 


def get_device(): 
    return torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

device = get_device()
model = ViT().to(device)
optimizer = optim.Adam(model.parameters(), lr =0.001)
criterion = nn.CrossEntropyLoss()

for epoch in range(100): 
    epoch_losses = []
    model.train()
    
    for step, (inputs, labels) in enumerate(train_loader): 
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        predictions = model(inputs)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())
        
        if epoch % 5 == 0:
            print (f'<<< Epoch : {epoch}, loss : {np.nean(epoch_losses)} >>>')
            epoch_losses_test = [] 

            correct = 0 
            for iter, (inputs, labels) in enumerate(test_loader):
                inputs, labels = inputs.to(device), labels.to(device)
                predictions = model(inputs)
                loss = criterion(predictions, labels)
                epoch_losses_test.append(loss.item())
                print (f'<<< Epoch : {epoch}, test loss : {np.mean(epoch_losses_test)}')
                
                if np.argmax(predictions) == labels : 
                    correct +=1 
                print ("Accuracy : {correct/len(labels)}")
                
        

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.

In [27]:
device = get_device()

In [28]:
device

device(type='cuda')