In [1]:
import cv2
import numpy as np
from matplotlib import pyplot as plt
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import torch
import torch.nn as nn
import time
import tqdm
import random
import torch.nn.functional as F
from torchvision import models
from torch.nn import init
from torch.nn.modules.utils import _pair
import math
from timm.models.layers import to_2tuple, trunc_normal_, DropPath
import einops
from thop import profile, clever_format
import gc
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def set_seed(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False



In [3]:
class DAttention(nn.Module):

    def __init__(
        self, n_heads, n_head_channels, n_groups,
        attn_drop, proj_drop, stride, ksize
    ):

        super().__init__()
        self.n_head_channels = n_head_channels
        self.scale = self.n_head_channels ** -0.5
        self.n_heads = n_heads
        
        self.nc = n_head_channels * n_heads
        self.n_groups = n_groups
        # 每個group幾個channel
        self.n_group_channels = self.nc // self.n_groups
        self.ksize = ksize
        # kernel_size
        kk = self.ksize
        pad_size = kk // 2 if kk != stride else 0

        self.conv_offset = nn.Sequential(
            nn.Conv2d(self.n_group_channels, self.n_group_channels, kk, stride, pad_size, groups=self.n_group_channels),
            LayerNormProxy(self.n_group_channels),
            nn.GELU(),
            nn.Conv2d(self.n_group_channels, 2, 1, 1, 0, bias=False)
        )

        self.proj_q = nn.Conv2d(
            self.nc, self.nc,
            kernel_size=1, stride=1, padding=0
        )

        self.proj_k = nn.Conv2d(
            self.nc, self.nc,
            kernel_size=1, stride=1, padding=0
        )

        self.proj_v = nn.Conv2d(
            self.nc, self.nc,
            kernel_size=1, stride=1, padding=0
        )

        self.proj_out = nn.Conv2d(
            self.nc, self.nc,
            kernel_size=1, stride=1, padding=0
        )

        self.proj_drop = nn.Dropout(proj_drop, inplace=True)
        self.attn_drop = nn.Dropout(attn_drop, inplace=True)


    @torch.no_grad()
    def _get_ref_points(self, H_key, W_key, B, dtype, device):
        # 產生值為-1~1的網格
        ref_y, ref_x = torch.meshgrid(
            torch.linspace(0.5, H_key - 0.5, H_key, dtype=dtype, device=device),
            torch.linspace(0.5, W_key - 0.5, W_key, dtype=dtype, device=device),
            indexing='ij'
        )
        ref = torch.stack((ref_y, ref_x), -1)
        ref[..., 1].div_(W_key - 1.0).mul_(2.0).sub_(1.0)
        ref[..., 0].div_(H_key - 1.0).mul_(2.0).sub_(1.0)
        ref = ref[None, ...].expand(B * self.n_groups, -1, -1, -1) # B * g H W 2

        return ref
    
    def forward(self, x):

        B, C, H, W = x.size()
        dtype, device = x.dtype, x.device
        # q
        q = self.proj_q(x)
        # offsets
        q_off = einops.rearrange(q, 'b (g c) h w -> (b g) c h w', g=self.n_groups, c=self.n_group_channels) #(B*n_groups, new_channel#, h, w)
        offset = self.conv_offset(q_off).contiguous()  # (B, 2, h/stride, w/stride) 
        
        Hk, Wk = offset.size(2), offset.size(3)
        n_sample = Hk * Wk


        offset = einops.rearrange(offset, 'b p h w -> b h w p') ## (B, h/stride, w/stride, 2) 
        # 採樣點
        reference = self._get_ref_points(Hk, Wk, B, dtype, device)
        
        # 變形採樣點
        pos = (offset + reference).clamp(-1., +1.)

        # 雙線性差值
        x_sampled = F.grid_sample(
                input=x.reshape(B * self.n_groups, self.n_group_channels, H, W), 
                grid=pos[..., (1, 0)], # y, x -> x, y
                mode='bilinear', align_corners=True) # B * g, Cg, Hg, Wg
                
        # 變形採樣後的x(x~)
        x_sampled = x_sampled.reshape(B, C, 1, n_sample)

        q = q.reshape(B * self.n_heads, self.n_head_channels, H * W)
        k = self.proj_k(x_sampled).reshape(B * self.n_heads, self.n_head_channels, n_sample)
        v = self.proj_v(x_sampled).reshape(B * self.n_heads, self.n_head_channels, n_sample)

        # att
        attn = torch.einsum('b c m, b c n -> b m n', q, k) # (B * n_heads, H*W, n_sample)
        attn = attn.mul(self.scale)
        attn = F.softmax(attn, dim=2)
        attn = self.attn_drop(attn)
        out = torch.einsum('b m n, b c n -> b c m', attn, v) #(B * n_heads, n_head_channels, H*W)

        
        out = out.reshape(B, C, H, W)
        # Wo
        y = self.proj_drop(self.proj_out(out))

        return y



In [4]:
class LayerNormProxy(nn.Module):
    
    def __init__(self, dim):
        
        super().__init__()
        self.norm = nn.LayerNorm(dim)

    def forward(self, x):
        x = einops.rearrange(x, 'b c h w -> b h w c')
        x = self.norm(x)
        return einops.rearrange(x, 'b h w c -> b c h w')

In [5]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=50):
        super(SimpleCNN, self).__init__()
        
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, 3, 1, 1), # [32, 128, 128]
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [32, 64, 64]

            nn.Conv2d(32, 64, 3, 1, 1), # [64, 64, 64]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [64, 32, 32]

            nn.Conv2d(64, 128, 3, 1, 1), # [128, 32, 32]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [128, 16, 16]

            nn.Conv2d(128, 256, 3, 1, 1), # [256, 16, 16]
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # [256, 8, 8]
        )
        self.fc = nn.Sequential(
            nn.Linear(256*8*8, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        out = self.fc(out)
        return out

In [6]:
class DATCNN(nn.Module):
    def __init__(self, num_classes=50, n_heads=4):
        super(DATCNN, self).__init__()
        
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, 3, 1, 1), # [32, 128, 128]
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [32, 64, 64]

            nn.Conv2d(32, 64, 3, 1, 1), # [64, 64, 64]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [64, 32, 32]

            nn.Conv2d(64, 128, 3, 1, 1), # [128, 32, 32]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # [128, 16, 16]
        )
        n_heads = n_heads
        self.dat = DAttention(n_heads=n_heads, n_head_channels=128//n_heads, n_groups=2, attn_drop=0, proj_drop=0, stride=1, ksize=3)
        self.norm = LayerNormProxy(128)
        self.nn2 = nn.Sequential(
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),
        )
        self.fc = nn.Sequential(
            nn.Linear(128*8*8, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        out = self.cnn(x)
        out = self.norm(self.dat(out)) + out
        out = self.nn2(out)
        out = out.contiguous().view(out.size()[0], -1)
        out = self.fc(out)
        return out

In [7]:
def calc_complexity(model, inputs):
    # 计算FLOPs和参数量
    flops, params = profile(model, inputs=(inputs,))
    flops, params = clever_format([flops, params], "%.3f")
    print(f"FLOPs: {flops}")
    print(f"Params: {params}")

In [8]:
def load_img(f):
    shapes = []
    f=open(f)
    lines=f.readlines()
    imgs, lab=[], []
    for i in range(len(lines)):
        fn, label = lines[i].split(' ')
        im1=cv2.imread(fn)

        if im1.shape[2] not in shapes:
            shapes.append(im1.shape[2])
        # im1=cv2.resize(im1, (img_size,img_size))
        # im1 = cv2.cvtColor(im1, cv2.COLOR_BGR2GRAY)

        # im1 = preprocessing(im1, op_list)
        # vec = np.reshape(im1, [-1])

        imgs.append(im1)
        lab.append(int(label))
        
    print(i)

    # imgs= np.asarray(imgs, np.uint8)
    lab= np.asarray(lab, np.uint8)
    # print(shapes)
    return imgs, lab


In [9]:
def create_model(model_name, n_heads=4, stride=1, ksize=3):
    model = None
    if model_name == "DATCNN":
        model = DATCNN(num_classes=50, n_heads=n_heads)
    elif model_name == "SimpleCNN":
        model = SimpleCNN(num_classes=50)
    elif model_name == "ResNet34":
        model = models.resnet34()
        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, 50)  # 设置最后一层输出为分类数目
    else:
        print(error)

    return model 

In [10]:
from PIL import Image

class ImgDataset(Dataset):
    def __init__(self, x, y=None, transform=None):
        self.x = x
        self.y = y
        if y is not None:
            self.y = torch.LongTensor(y)
        self.transform = transform
    def __len__(self):
        return len(self.x)
    def __getitem__(self, index):
        X = self.x[index]
        if self.transform is not None:
            X = self.transform(X)
        if self.y is not None:
            Y = self.y[index]
            return X, Y
        else:
            return X



In [11]:
def train(model, train_loader, val_loader, eval_time, num_epoch, n_train, n_val, lr, device):
    # print("--4--")
    # device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # print("device", device)
    # model = model.to(device)

    loss = nn.CrossEntropyLoss() # 因為是 classification task，所以 loss 使用 CrossEntropyLoss
    optimizer = torch.optim.Adam(model.parameters(), lr=lr) # optimizer 使用 Adam


    for epoch in range(num_epoch):
        print(epoch)
        
        epoch_start_time = time.time()
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0

        model.train() # 確保 model 是在 train model (開啟 Dropout 等...)
        for i, data in enumerate(train_loader):
            optimizer.zero_grad() # 用 optimizer 將 model 參數的 gradient 歸零
            
            train_pred = model(data[0].to(device)) # 利用 model 得到預測的機率分佈 這邊實際上就是去呼叫 model 的 forward 函數
            batch_loss = loss(train_pred, data[1].to(device)) # 計算 loss （注意 prediction 跟 label 必須同時在 CPU 或是 GPU 上）
            batch_loss.backward() # 利用 back propagation 算出每個參數的 gradient
            optimizer.step() # 以 optimizer 用 gradient 更新參數值

            train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
            train_loss += batch_loss.item()

        if epoch % eval_time == 0:
            model.eval()
            with torch.no_grad():
                for i, data in enumerate(val_loader):
                    val_pred = model(data[0].to(device))
                    batch_loss = loss(val_pred, data[1].to(device))

                    val_acc += np.sum(np.argmax(val_pred.cpu().data.numpy(), axis=1) == data[1].numpy())
                    val_loss += batch_loss.item()

                # 將結果 print 出來
                print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f | Val Acc: %3.6f loss: %3.6f' % \
                    (epoch + 1, num_epoch, time.time()-epoch_start_time, \
                     train_acc/n_train, train_loss/n_train, val_acc/n_val, val_loss/n_val))

               
                print("Train/epoch",  epoch)
                print("Train/acc", train_acc/n_train)
                print("Train/loss", train_loss/n_train)
                print("Val/epoch", epoch)
                print("Val/acc", val_acc/n_val)
                print("Val/loss", val_loss/n_val)


In [12]:
def test(model, test_loader, n_test, device):
    model.eval()
    # device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # print("device", device)
    # model = model.to(device)
    loss = nn.CrossEntropyLoss() # 因為是 classification task，所以 loss 使用 CrossEntropyLoss
    
    all_labels = []
    all_preds = []
    test_acc = 0.0
    test_loss = 0.0
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            test_pred = model(data[0].to(device))
            batch_loss = loss(test_pred, data[1].to(device))
            test_loss += batch_loss.item()

            _, preds = torch.max(test_pred, 1)
            all_labels.extend(data[1].cpu().numpy())
            all_preds.extend(preds.cpu().numpy())


        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')

        print("--------Test result-------------")
        print(f'Accuracy: {accuracy:.4f}')
        print(f'Precision: {precision:.4f}')
        print(f'Recall: {recall:.4f}')
        print(f'F1-score: {f1:.4f}')
        print(f'Loss: {test_loss//n_test:.4f}')
        print("--------------------------------")


In [None]:
# nof_heads
def experiment_5_1():
    # 超參數
    #############
    eval_time = 1
    num_epoch = 30
    num_classes = 50
    img_size = 144
    input_size = 128
    batch_size = 128
    lr = 0.001
    model_name= "DATCNN"
    stride=4
    ksize=7
    
    #############
    set_seed(42)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    x, y = load_img('train.txt')
    vx, vy = load_img('val.txt')
    tx, ty = load_img('test.txt')
    print("--1--")
    # 定義transform
    # training 時做 data augmentation
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.RandomRotation(degrees=30),  # 旋轉
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # 平移
        transforms.RandomCrop(input_size),  # 隨機裁剪
        transforms.RandomHorizontalFlip(),  # 水平翻轉
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化

    ])
    # testing 時不需做 data augmentation
    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.CenterCrop(input_size),  # 中心裁剪
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化
    ])
    # data loader
    train_set = ImgDataset(x, y, train_transform)
    val_set = ImgDataset(vx, vy, test_transform)
    test_set = ImgDataset(tx, ty, test_transform)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, worker_init_fn=lambda _: np.random.seed(42), num_workers=4)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=4)
    del x, vx, tx

    
    
    # n_heads要能被128整除
    for n_heads in [1, 2, 4, 8]:
        print("-----number of heads = "+str(n_heads)+"-----")
        # create model
        model = create_model(model_name, n_heads=n_heads, stride=stride, ksize=ksize).to(device)
        # 測FLOPs、params
        # 创建输入张量
        inputs = torch.randn(1, 3, input_size, input_size)
        calc_complexity(model, inputs.to(device))
        
        
        n_train = train_set.__len__()
        n_val = val_set.__len__()
        n_test = test_set.__len__()
        train(model, train_loader, val_loader, eval_time, num_epoch, n_train, n_val, lr, device)
        test(model, test_loader, n_test, device)
       
        del model
        gc.collect()
    
        
        
experiment_5_1()   


In [None]:
# nof_heads
def experiment_5_2(n_heads):
    # 超參數
    #############
    eval_time = 1
    num_epoch = 30
    num_classes = 50
    img_size = 144
    input_size = 128
    batch_size = 128
    lr = 0.001
    model_name= "DATCNN"
    n_heads = n_heads
    #############
    set_seed(42)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    x, y = load_img('train.txt')
    vx, vy = load_img('val.txt')
    tx, ty = load_img('test.txt')
    print("--1--")
    # 定義transform
    # training 時做 data augmentation
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.RandomRotation(degrees=30),  # 旋轉
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # 平移
        transforms.RandomCrop(input_size),  # 隨機裁剪
        transforms.RandomHorizontalFlip(),  # 水平翻轉
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化

    ])
    # testing 時不需做 data augmentation
    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.CenterCrop(input_size),  # 中心裁剪
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化
    ])
    # data loader
    train_set = ImgDataset(x, y, train_transform)
    val_set = ImgDataset(vx, vy, test_transform)
    test_set = ImgDataset(tx, ty, test_transform)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, worker_init_fn=lambda _: np.random.seed(42), num_workers=4)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=4)
    del x, vx, tx

    
   
    # n_heads要能被128整除
    strides = [8, 4, 2, 1]
    ksizes= [9, 7, 5, 3]
    for stride, ksize in zip(strides, ksizes):
        print("----- (stride, ksize) = ("+str(stride)+", "+str(ksize)+") -----")
        # create model
        model = create_model(model_name, n_heads=n_heads, stride=stride, ksize=ksize).to(device)
        # 測FLOPs、params
        # 创建输入张量
        inputs = torch.randn(1, 3, input_size, input_size)
        calc_complexity(model, inputs.to(device))
        
        
        n_train = train_set.__len__()
        n_val = val_set.__len__()
        n_test = test_set.__len__()
        train(model, train_loader, val_loader, eval_time, num_epoch, n_train, n_val, lr, device)
        test(model, test_loader, n_test, device)
        del model
        gc.collect()
        
        
experiment_5_2(n_heads=4)   


In [None]:
# no self-attention
def experiment_3_1(n_heads, stride, ksize):
    # 超參數
    #############
    eval_time = 1
    num_epoch = 30
    num_classes = 50
    img_size = 144
    input_size = 128
    batch_size = 128
    lr = 0.001
    n_heads=n_heads
    stride=stride
    ksize=ksize
    #############
    set_seed(42)
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    x, y = load_img('train.txt')
    vx, vy = load_img('val.txt')
    tx, ty = load_img('test.txt')
    print("--1--")
    # 定義transform
    # training 時做 data augmentation
    train_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.RandomRotation(degrees=30),  # 旋轉
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # 平移
        transforms.RandomCrop(input_size),  # 隨機裁剪
        transforms.RandomHorizontalFlip(),  # 水平翻轉
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化

    ])
    # testing 時不需做 data augmentation
    test_transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((img_size, img_size)),  # 縮放
        transforms.CenterCrop(input_size),  # 中心裁剪
        transforms.ToTensor(),  # 轉換為Tensor
        # transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 標準化
    ])
    # data loader
    train_set = ImgDataset(x, y, train_transform)
    val_set = ImgDataset(vx, vy, test_transform)
    test_set = ImgDataset(tx, ty, test_transform)
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, worker_init_fn=lambda _: np.random.seed(42), num_workers=4)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=4)
    test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=4)
    del x, vx, tx

    
    
    # n_heads要能被128整除
    for model_name in ["ResNet34", "DATCNN", "SimpleCNN"]:
        print("-----model = "+model_name+"-----")
        # create model
        model = create_model(model_name, n_heads=n_heads, stride=stride, ksize=ksize).to(device)
        # 測FLOPs、params
        # 创建输入张量
        inputs = torch.randn(1, 3, input_size, input_size)
        calc_complexity(model, inputs.to(device))
        
        n_train = train_set.__len__()
        n_val = val_set.__len__()
        n_test = test_set.__len__()
        train(model, train_loader, val_loader, eval_time, num_epoch, n_train, n_val, lr, device)
        test(model, test_loader, n_test, device)
        del model
        gc.collect()
    
        
        
experiment_3_1(n_heads=8, stride=4, ksize=7)   
