In [None]:
filenum = 19
expid = 1
import os
import numpy as np
import time
sleeptime = np.random.rand(1)*10
time.sleep(int(sleeptime*100)/100)
filehead = 'tf_leaky_result_train_mnist'+str(filenum).zfill(3)+'_'
jlist = [int(j.split(filehead)[1].split('.')[0]) for j in os.listdir('.') if j.startswith(filehead) and j.endswith('.log')]
if len(jlist)==0:
    expid = 1
else:
    expid = np.max(jlist)+1
import torch
torch.save(0,filehead + str(expid).zfill(3) + '.log')

In [None]:
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    torch.cuda.manual_seed_all(expid)
    print("GPU is available")
else:
    device = torch.device("cpu")
    torch.manual_seed(expid)
    print("GPU not available, CPU used")

In [None]:
import torch
import torchvision
import numpy as np

# torch.manual_seed(expid)

DOWNLOAD_PATH = '/data/mnist'
BATCH_SIZE_TRAIN = 100
BATCH_SIZE_TEST = 1000

transform_mnist = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize((0.1307,), (0.3081,))])

train_set = torchvision.datasets.MNIST('./data', train=True, download=True,
                                       transform=transform_mnist)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE_TRAIN, shuffle=True)

test_set = torchvision.datasets.MNIST('./data', train=False, download=True,
                                      transform=transform_mnist)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE_TEST, shuffle=True)

In [None]:
# evens = list(range(0, len(trainset), 2))
# odds = list(range(1, len(trainset), 2))
trainidx = np.random.permutation(len(train_set))
trainidx = trainidx[:6000]
trainset = torch.utils.data.Subset(train_set, trainidx)

In [None]:
import torch.nn as nn
class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, hidden_dim, alpha, channels=3, dim_head = 64):
        super().__init__()
        assert image_size % patch_size == 0, 'image dimensions must be divisible by the patch size'
        num_patches = (image_size // patch_size) ** 2
        patch_dim = channels * patch_size ** 2
        self.alpha = alpha

        self.patch_size = patch_size

        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim)).to(device)
        self.patch_to_embedding = nn.Linear(patch_dim, dim).to(device)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim)).to(device)
        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim).to(device)

        self.to_cls_token = nn.Identity().to(device)

        self.mlp_head = nn.Sequential(
            nn.Linear(dim, hidden_dim).to(device),
#             nn.GELU(),
            nn.LeakyReLU(negative_slope = self.alpha).to(device),
            nn.Linear(hidden_dim, num_classes).to(device)
        )
#         print(self.modules())
        for m in self.mlp_head.modules():
            print(m)
            if isinstance(m, nn.Linear):
#                 n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
#                 m.weight.data.normal_(0, math.sqrt(2. / n))
#                 m.bias.data.zero_()
                nn.init.constant_(m.bias, 0)
                nn.init.normal_(m.weight, mean=0, std=np.sqrt(2./m.out_features))

    def forward(self, img, mask=None):
        p = self.patch_size

        x = rearrange(img, 'b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1=p, p2=p).to(device)
        x = self.patch_to_embedding(x).to(device)

        cls_tokens = self.cls_token.expand(img.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding
        x = self.transformer(x)
        print(x.shape)
        x = self.to_cls_token(x[:, 0])
        print(x.shape)
        return self.mlp_head(x)/np.sqrt(1+self.alpha*self.alpha)

In [None]:
class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout = 0.):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout),
                FeedForward(dim, mlp_dim, dropout = dropout)
            ]))

    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x

        return self.norm(x)

In [None]:
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

# helpers

def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# classes

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.norm = nn.LayerNorm(dim)

        self.attend = nn.Softmax(dim = -1)
        self.dropout = nn.Dropout(dropout)

        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        x = self.norm(x)

        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.attend(dots)
        attn = self.dropout(attn)

        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

In [None]:
def train_epoch(model, optimizer, data_loader, loss_history):
    total_samples = len(data_loader.dataset)
    model.train()

    for i, (data, target) in enumerate(data_loader):
        optimizer.zero_grad()
        data, target = data.to(device), target.to(device)
        output = F.log_softmax(model(data), dim=1)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print('[' +  '{:5}'.format(i * len(data)) + '/' + '{:5}'.format(total_samples) +
                  ' (' + '{:3.0f}'.format(100 * i / len(data_loader)) + '%)]  Loss: ' +
                  '{:6.4f}'.format(loss.item()))
            loss_history.append(loss.item())
        return loss_history

In [None]:
def evaluate(model, data_loader, loss_history):
    model.eval()
    
    total_samples = len(data_loader.dataset)
    correct_samples = 0
    total_loss = 0
    loss_list = []

    with torch.no_grad():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            output = F.log_softmax(model(data), dim=1)
            loss = F.nll_loss(output, target, reduction='sum')
            _, pred = torch.max(output, dim=1)
            
            total_loss += loss.item()
            loss_list.append(loss.item())
            correct_samples += pred.eq(target).sum()

    avg_loss = total_loss / total_samples
#     loss_history.append(avg_loss)
    loss_history.append(loss_list)
    print('\nAverage test loss: ' + '{:.4f}'.format(avg_loss) +
          '  Accuracy:' + '{:5}'.format(correct_samples) + '/' +
          '{:5}'.format(total_samples) + ' (' +
          '{:4.2f}'.format(100.0 * correct_samples / total_samples) + '%)\n')
    return loss_history

In [None]:
N_EPOCHS = 400

import time
import torch.optim as optim
import torch.nn.functional as F
alphalist = [-2,-1,0,0.01]
train_losses = {}
test_losses = {}
for alpha in alphalist:
    start_time = time.time()
    model = ViT(image_size=28, patch_size=7, num_classes=10, channels=1,
                dim=64, depth=6, heads=8, mlp_dim=8192, hidden_dim = 8192, alpha = alpha)
    model = model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    train_loss_history, test_loss_history = [], []
    for epoch in range(1, N_EPOCHS + 1):
        print('Epoch:', epoch)
        train_loss_history = train_epoch(model, optimizer, train_loader, train_loss_history)
        test_loss_history = evaluate(model, test_loader, test_loss_history)
    train_losses[alpha] = train_loss_history
    test_losses[alpha] = test_loss_history
    print('Execution time:', '{:5.2f}'.format(time.time() - start_time), 'seconds')


In [None]:
torch.save(train_losses, 'tf_leaky_result_train_mnist'+str(filenum).zfill(3)+'_'+str(expid).zfill(3)+'.pt')

In [None]:
torch.save(test_losses, 'tf_leaky_result_test_mnist'+str(filenum).zfill(3)+'_'+str(expid).zfill(3)+'.pt')