<a href="https://colab.research.google.com/github/shiyuhu1933/EC-523-final-project/blob/main/ResSaNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
import torch
import os
import h5py
import numpy as np
import torch.nn as nn

drive.mount('/content/gdrive')

DATASET_GOOGLE_DRIVE_PATH = '/content/gdrive/MyDrive/Deep_Learning /data_hdf5_random_with_aug'


def get_file_name(files_list):
  with open(files_list) as f:
    return [line.rstrip()[:] for line in f]

class FaceMaskData(torch.utils.data.Dataset):

  def __init__(self, mode, config, transform=None):

    self.data_dir = DATASET_GOOGLE_DRIVE_PATH
    self.transform = transform

    if mode == 'train':
      self.files = get_file_name(os.path.join(self.data_dir, 'train_files.txt'))
    else:
      self.files = get_file_name(os.path.join(self.data_dir, 'test_files.txt'))

    image = []
    label = []

    for dataset in self.files:
      path = os.path.join(self.data_dir, dataset)
      self.file = h5py.File(path, 'r')
      self.total_num_imgs, self.H, self.W, self.C = self.file['image'].shape
      image.append(self.file['image'][:])
      label.append(self.file['labels'][:])

    self.image= np.vstack(image)
    self.label = np.vstack(label)

    self.num_images = len(self.image) 
    self.num_classes = len(np.unique(self.label))

  def __getitem__(self, index):
    """Return one image and its corresponding attribute label."""
    image = self.image[index]
    label = self.label[index]
    if self.transform:
        image = self.transform(image)
    return image, torch.FloatTensor(label)

  def __len__(self):
    return self.num_images
      
  def get_num_class(self):
    return self.num_classes

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
def get_configuration(repeat_num, batch_size):
    config = {}

    config['device'] = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    config['batch_size'] = batch_size
    config['num_workers'] = 1
    config['repeat_num'] = repeat_num
    config['lr'] = 0.0001

    return config


def image_to_rgb(images):
  imgs = []
  for i in range(len(images)):
    image_r = torch.unsqueeze(images[i,:,:,0], 0)
    image_g = torch.unsqueeze(images[i,:,:,1], 0)
    image_b = torch.unsqueeze(images[i,:,:,2], 0)
    image_split_rgb = torch.cat((image_r, image_g, image_b), 0)
    image_split_rgb = torch.unsqueeze(image_split_rgb, 0)
    imgs.append(image_split_rgb)
  return torch.cat(imgs)


def print_network(model, name):
  """Print out the network information."""
  num_params = 0
  print("\n")
  print("model name", name)
  print(model)
  num_params = sum([par.numel() for par in model.parameters()])
  print("The number of parameters: {}".format(num_params))

def save_model(model, step):
  save_model_path = "/content/gdrive/MyDrive/Deep Learning /models"
  model_path = os.path.join(save_model_path, 'model-{}.ckpt'.format(step))
  torch.save(model, model_path)
  print('Saved model checkpoints into {}...'.format(save_model_path))

def load_model(iters):
  path = "/content/gdrive/MyDrive/Deep Learning /models"
  model_path = os.path.join(path, 'model-{}.ckpt'.format(iters))
  model = torch.load(model_path)
  return model

In [3]:
import torch.nn as nn
import torchvision.models as models

config = get_configuration(repeat_num=10000, batch_size=16)
dataset = FaceMaskData('train', config)
data_loader = torch.utils.data.DataLoader(dataset=dataset,
                              batch_size=config['batch_size'],
                              shuffle=True, 
                              num_workers=config['num_workers'])
num_classes = dataset.get_num_class()

In [18]:
import torch.nn as nn

class IBSA_Block(nn.Module):
  def __init__(self, dim_in, dim_out, stride=1):
    super().__init__()

    self.dim_in, self.dim_out = dim_in, dim_out
    self.bn1 = nn.BatchNorm2d(dim_in, affine=True, track_running_stats=True)
    self.bn2 = nn.BatchNorm2d(dim_out, affine=True, track_running_stats=True)
    self.conv = nn.Conv2d(dim_in, dim_out, kernel_size=3, stride=stride)
    self.mhsa = nn.MultiheadAttention(dim_in,num_heads=512)
    self.prelu = nn.PReLU()
  
  def forward(self, input):
    identity = input

    output = self.bn1(input)
    output = self.conv(output)
    output = self.bn2(output)
    output = self.prelu(output)
    output = self.mhsa(output)
    output = self.bn2(output)

    output += identity
    
    return output

In [19]:
import math

import torch
import torch.nn as nn
import torch.nn.functional as F


def conv_bn_act(in_, out_, kernel_size,
                stride=1, groups=1, bias=True,
                eps=1e-3, momentum=0.01):
    return nn.Sequential(
        SamePadConv2d(in_, out_, kernel_size, stride, groups=groups, bias=bias),
        nn.BatchNorm2d(out_, eps, momentum),
        Swish()
    )


class SamePadConv2d(nn.Conv2d):

    def __init__(self, in_channels, out_channels, kernel_size, stride=1, dilation=1, groups=1, bias=True, padding_mode="zeros"):
        super().__init__(in_channels, out_channels, kernel_size, stride, 0, dilation, groups, bias, padding_mode)

    def get_pad_odd(self, in_, weight, stride, dilation):
        effective_filter_size_rows = (weight - 1) * dilation + 1
        out_rows = (in_ + stride - 1) // stride
        padding_needed = max(0, (out_rows - 1) * stride + effective_filter_size_rows - in_)
        padding_rows = max(0, (out_rows - 1) * stride + (weight - 1) * dilation + 1 - in_)
        rows_odd = (padding_rows % 2 != 0)
        return padding_rows, rows_odd

    def forward(self, x):
        padding_rows, rows_odd = self.get_pad_odd(x.shape[2], self.weight.shape[2], self.stride[0], self.dilation[0])
        padding_cols, cols_odd = self.get_pad_odd(x.shape[3], self.weight.shape[3], self.stride[1], self.dilation[1])

        if rows_odd or cols_odd:
            x = F.pad(x, [0, int(cols_odd), 0, int(rows_odd)])

        return F.conv2d(x, self.weight, self.bias, self.stride,
                        padding=(padding_rows // 2, padding_cols // 2),
                        dilation=self.dilation, groups=self.groups)


class Swish(nn.Module):
    def forward(self, x):
        return x * torch.sigmoid(x)


class Flatten(nn.Module):
    def forward(self, x):
        return x.view(x.shape[0], -1)


class SEModule(nn.Module):
    def __init__(self, in_, squeeze_ch):
        super().__init__()
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(in_, squeeze_ch, kernel_size=1, stride=1, padding=0, bias=True),
            Swish(),
            nn.Conv2d(squeeze_ch, in_, kernel_size=1, stride=1, padding=0, bias=True),
        )

    def forward(self, x):
        return x * torch.sigmoid(self.se(x))


class DropConnect(nn.Module):
    def __init__(self, ratio):
        super().__init__()
        self.ratio = 1.0 - ratio

    def forward(self, x):
        if not self.training:
            return x

        random_tensor = self.ratio
        random_tensor += torch.rand([x.shape[0], 1, 1, 1], dtype=torch.float, device=x.device)
        random_tensor.requires_grad_(False)
        return x / self.ratio * random_tensor.floor()

In [20]:
class MBConv(nn.Module):
    def __init__(self, in_, out_, expand=1,
                  stride=1, 
                 se_ratio=0.1, dc_ratio=0.2):
        super().__init__()
        mid_ = in_ * expand
        self.expand_conv = conv_bn_act(in_, mid_, kernel_size=1, bias=False) if expand != 1 else nn.Identity()

        self.depth_wise_conv = conv_bn_act(mid_, mid_,
                                           kernel_size=3, stride=stride,
                                           groups=mid_, bias=False)

        self.se = SEModule(mid_, int(in_ * se_ratio)) if se_ratio > 0 else nn.Identity()

        self.project_conv = nn.Sequential(
            SamePadConv2d(mid_, out_, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(out_, 1e-3, 0.01)
        )

        
        self.dropconnect = nn.Identity()

    def forward(self, inputs):
        expand = self.expand_conv(inputs)
        x = self.depth_wise_conv(expand)
        x = self.se(x)
        x = self.project_conv(x)
        if self.skip:
            x = self.dropconnect(x)
            x = x + inputs
        return x


In [7]:
from SEblock import SE_block
from SE_Ibasic import SE_IBasicBlock
from IBasic_Block import IBasic_Block

In [21]:

class ResSaNet50(nn.Module):
  def __init__(self, dim_conv_1=64, 
               dim_stage_1=64,  
               dim_stage_2=128,
               dim_stage_3=256, 
               dim_stage_4=512, 
               dim_FC=512, num_classes=num_classes
               ):
    super().__init__()

    layers = []
    layers.append(nn.Conv2d(3, dim_conv_1, kernel_size=1, stride=2, padding=3))
    for i in range(3):
      layers.append(nn.Conv2d(dim_conv_1, dim_conv_1, kernel_size=3, padding=1))
    layers.append(nn.Conv2d(dim_conv_1, dim_stage_1, kernel_size=2))
    for i in range(3):
      layers.append(IBasic_Block(dim_stage_1, dim_stage_1))
    layers.append(nn.Conv2d(dim_stage_1, dim_stage_2, kernel_size=2))
    for i in range(4):
      layers.append(SE_IBasicBlock(dim_stage_2, dim_stage_2))
    layers.append(nn.Conv2d(dim_stage_2, dim_stage_3, kernel_size=2))
    for i in range(14):
      layers.append(SE_IBasicBlock(dim_stage_3, dim_stage_3))
    layers.append(nn.Conv2d(dim_stage_3, dim_stage_4, kernel_size=2))
    # for i in range(3):
    #   layers.append(IBasic_Transformer(dim_in_stage_4, dim_out_stage_4))
    # layers.append(nn.Linear(dim_in_FC, num_classes))

    self.main = nn.Sequential(*layers)

    k=1
    self.MBConv = MBConv(dim_stage_4, dim_stage_4)

    self.IBSA = IBSA_Block(dim_stage_4, dim_stage_4)
    self.conv = nn.Conv2d(dim_stage_4, dim_FC, kernel_size=1)
    self.fc = nn.Linear(dim_FC, num_classes)

  def forward(self, x):
    output = self.main(x)
    output = self.IBSA(output)
    output += self.MBConv(output)
    output = self.fc(self.conv(output))
    return output

In [9]:
model = ResSaNet50().cuda()
print_network(model, 'ResSaNet50')



model name ResSaNet50
ResSaNet50(
  (main): Sequential(
    (0): Conv2d(3, 64, kernel_size=(1, 1), stride=(2, 2), padding=(3, 3))
    (1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): Conv2d(64, 64, kernel_size=(2, 2), stride=(1, 1))
    (5): IBasic_Block(
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (prelu): PReLU(num_parameters=1)
    )
    (6): IBasic_Block(
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1

In [22]:
import torch
import torch.nn as nn
import math

class ArcFaceloss(nn.Module):

    def __init__(self, s=45.0, m=0.1, weight = None):
        super(ArcFaceloss, self).__init__()
      
        self.weight = weight
        self.s = s
        self.cosm = math.cos(m)
        self.sinm = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m
    
    def forward(self, input, labels):
        
        cost = F.normalize(input)
        sint = torch.sqrt(1.0 - torch.square(cost))
        cosmt = self.s * (self.cosm * cost - self.sinm * sint)
        k = torch.where(cost > self.th, cosmt, self.s * (cost - self.mm))
        
        label = torch.zeros_like(cost)
        label.scatter_(1,labels.view(-1,1).long(),1)
        output = (1 - label) * self.s * cost + label * k
        
        cross_entropy = nn.CrossEntropyLoss()
        output = cross_entropy(output, labels)
        # print(output)

        return output

In [23]:
import time
from torch import nn, optim
import torchvision.models as models
import torch.nn.functional as F
import datetime

model = ResSaNet50().cuda()

criterion = ArcFaceloss().cuda()

optimizer = optim.SGD(model.parameters(), lr=0.05, momentum =0.9, weight_decay=5e-4)

Epochs = 30

loss = []

print('Start training ...')
start_time = time.time()
for epoch in range(1, Epochs+1):
  model.train()

  loss_acc = 0 
  for i, data in enumerate(data_loader, 0):
    images, labels = data
    images, labels = images.float().cuda(), labels.cuda()
    images = image_to_rgb(images)
    labels = labels.type(torch.LongTensor).cuda()
    labels = labels.squeeze(-1)
    # print(images.size())
    # print(labels.size())
    logits = model(images)
    # print(logits.size())
    loss = criterion(logits, labels)
    loss_acc += loss.item()

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  loss_acc /= Epochs
  et = time.time() - start_time
  et = str(datetime.timedelta(seconds=et))[:-7]
  log = "Elapsed [{}], Iteration [{}/{}]".format(et, epoch, Epochs)
  log += ', Loss = %.8f'%(loss_acc)
  print(log)


Start training ...


TypeError: ignored

In [None]:
dataset_test = FaceMaskData('test', config)
test_loader = torch.utils.data.DataLoader(dataset=dataset_test,
                              batch_size=config['batch_size'],
                              shuffle=True, 
                              num_workers=config['num_workers'])
def test_on_resnet(model, test_loader):
    
    ## -- ! code required  
    num_correct = 0
    total = 0
    

    with torch.no_grad():
        for i, data in enumerate(test_loader, 0):
            images, labels = data

            images, labels = images.float().cuda(), labels.cuda()
            images = image_to_rgb(images)
            labels = labels.type(torch.LongTensor).cuda()
            labels = labels.squeeze(-1)
        
            logits = model(images)
            _, predicted = torch.max(logits.data, 1)
            total += labels.size(0)
            num_correct += (predicted == labels).sum().item()
    acc = 100 * num_correct / total         
    return acc

acc = test_on_resnet(model, test_loader)
print('Accuracy of the network on the 10000 test images: %2f %%' % (acc))