In [1]:

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install timm
!pip install torchprofile



In [3]:
from timm.models import create_model
from timm.models.registry import register_model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Sequential as Seq, Linear as Lin, Conv2d
from timm.models.layers import DropPath
import numpy as np
from timm.data import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD
from torchprofile import profile_macs
from timm.optim import create_optimizer
from types import SimpleNamespace
from timm.scheduler import create_scheduler
import torchvision
import torchvision.transforms as transforms
import math
from torch.utils.data import Subset
from sklearn.model_selection import train_test_split
from timm.utils import *
torch.backends.cudnn.benchmark = True
from collections import OrderedDict
import os
from timm.data.dataset import ImageDataset
from datetime import date

device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


#Model classes

In [4]:
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):
    """
    embed_dim: output dimension for each position
    pos: a list of positions to be encoded: size (M,)
    out: (M, D)
    """
    assert embed_dim % 2 == 0
    omega = np.arange(embed_dim // 2, dtype=np.float)
    omega /= embed_dim / 2.
    omega = 1. / 10000**omega  # (D/2,)

    pos = pos.reshape(-1)  # (M,)
    out = np.einsum('m,d->md', pos, omega)  # (M, D/2), outer product

    emb_sin = np.sin(out) # (M, D/2)
    emb_cos = np.cos(out) # (M, D/2)

    emb = np.concatenate([emb_sin, emb_cos], axis=1)  # (M, D)
    return emb

def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):
    assert embed_dim % 2 == 0

    # use half of dimensions to encode grid_h
    emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0])  # (H*W, D/2)
    emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1])  # (H*W, D/2)

    emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D)
    return emb


In [5]:
def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False):
    """
    grid_size: int of the grid height and width
    return:
    pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token)
    """
    grid_h = np.arange(grid_size, dtype=np.float32)
    grid_w = np.arange(grid_size, dtype=np.float32)
    grid = np.meshgrid(grid_w, grid_h)  # here w goes first
    grid = np.stack(grid, axis=0)

    grid = grid.reshape([2, 1, grid_size, grid_size])
    pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)
    if cls_token:
        pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0)
    return pos_embed

In [6]:
def get_2d_relative_pos_embed(embed_dim, grid_size):
    """
    grid_size: int of the grid height and width
    return:
    pos_embed: [grid_size*grid_size, grid_size*grid_size]
    """
    pos_embed = get_2d_sincos_pos_embed(embed_dim, grid_size)
    relative_pos = 2 * np.matmul(pos_embed, pos_embed.transpose()) / pos_embed.shape[1]
    return relative_pos


In [7]:
def pairwise_distance(x):
    """
    Compute pairwise distance of a point cloud.
    Args:
        x: tensor (batch_size, num_points, num_dims)
    Returns:
        pairwise distance: (batch_size, num_points, num_points)
    """
    with torch.no_grad():
        x_inner = -2*torch.matmul(x, x.transpose(2, 1))
        x_square = torch.sum(torch.mul(x, x), dim=-1, keepdim=True)
        return x_square + x_inner + x_square.transpose(2, 1)


def part_pairwise_distance(x, start_idx=0, end_idx=1):
    """
    Compute pairwise distance of a point cloud.
    Args:
        x: tensor (batch_size, num_points, num_dims)
    Returns:
        pairwise distance: (batch_size, num_points, num_points)
    """
    with torch.no_grad():
        x_part = x[:, start_idx:end_idx]
        x_square_part = torch.sum(torch.mul(x_part, x_part), dim=-1, keepdim=True)
        x_inner = -2*torch.matmul(x_part, x.transpose(2, 1))
        x_square = torch.sum(torch.mul(x, x), dim=-1, keepdim=True)
        return x_square_part + x_inner + x_square.transpose(2, 1)


def xy_pairwise_distance(x, y):
    """
    Compute pairwise distance of a point cloud.
    Args:
        x: tensor (batch_size, num_points, num_dims)
    Returns:
        pairwise distance: (batch_size, num_points, num_points)
    """
    with torch.no_grad():
        xy_inner = -2*torch.matmul(x, y.transpose(2, 1))
        x_square = torch.sum(torch.mul(x, x), dim=-1, keepdim=True)
        y_square = torch.sum(torch.mul(y, y), dim=-1, keepdim=True)
        return x_square + xy_inner + y_square.transpose(2, 1)


In [8]:
def dense_knn_matrix(x, k=16, relative_pos=None):
    """Get KNN based on the pairwise distance.
    Args:
        x: (batch_size, num_dims, num_points, 1)
        k: int
    Returns:
        nearest neighbors: (batch_size, num_points, k) (batch_size, num_points, k)
    """
    with torch.no_grad():
        x = x.transpose(2, 1).squeeze(-1)
        batch_size, n_points, n_dims = x.shape
        ### memory efficient implementation ###
        n_part = 10000
        if n_points > n_part:
            nn_idx_list = []
            groups = math.ceil(n_points / n_part)
            for i in range(groups):
                start_idx = n_part * i
                end_idx = min(n_points, n_part * (i + 1))
                dist = part_pairwise_distance(x.detach(), start_idx, end_idx)
                if relative_pos is not None:
                    dist += relative_pos[:, start_idx:end_idx]
                _, nn_idx_part = torch.topk(-dist, k=k)
                nn_idx_list += [nn_idx_part]
            nn_idx = torch.cat(nn_idx_list, dim=1)
        else:
            dist = pairwise_distance(x.detach())
            if relative_pos is not None:
                dist += relative_pos
            _, nn_idx = torch.topk(-dist, k=k) # b, n, k
        ######
        center_idx = torch.arange(0, n_points, device=x.device).repeat(batch_size, k, 1).transpose(2, 1)
    return torch.stack((nn_idx, center_idx), dim=0)


def xy_dense_knn_matrix(x, y, k=16, relative_pos=None):
    """Get KNN based on the pairwise distance.
    Args:
        x: (batch_size, num_dims, num_points, 1)
        k: int
    Returns:
        nearest neighbors: (batch_size, num_points, k) (batch_size, num_points, k)
    """
    with torch.no_grad():
        x = x.transpose(2, 1).squeeze(-1)
        y = y.transpose(2, 1).squeeze(-1)
        batch_size, n_points, n_dims = x.shape
        dist = xy_pairwise_distance(x.detach(), y.detach())
        if relative_pos is not None:
            dist += relative_pos
        _, nn_idx = torch.topk(-dist, k=k)
        center_idx = torch.arange(0, n_points, device=x.device).repeat(batch_size, k, 1).transpose(2, 1)
    return torch.stack((nn_idx, center_idx), dim=0)


In [9]:
def act_layer(act, inplace=False, neg_slope=0.2, n_prelu=1):
    # activation layer

    act = act.lower()
    if act == 'relu':
        layer = nn.ReLU(inplace)
    elif act == 'leakyrelu':
        layer = nn.LeakyReLU(neg_slope, inplace)
    elif act == 'prelu':
        layer = nn.PReLU(num_parameters=n_prelu, init=neg_slope)
    elif act == 'gelu':
        layer = nn.GELU()
    elif act == 'hswish':
        layer = nn.Hardswish(inplace)
    else:
        raise NotImplementedError('activation layer [%s] is not found' % act)
    return layer


def norm_layer(norm, nc):
    # normalization layer 2d
    norm = norm.lower()
    if norm == 'batch':
        layer = nn.BatchNorm2d(nc, affine=True)
    elif norm == 'instance':
        layer = nn.InstanceNorm2d(nc, affine=False)
    else:
        raise NotImplementedError('normalization layer [%s] is not found' % norm)
    return layer

In [10]:
class DenseDilated(nn.Module):
    """
    Find dilated neighbor from neighbor list

    edge_index: (2, batch_size, num_points, k)
    """
    def __init__(self, k=9, dilation=1, stochastic=False, epsilon=0.0):
        super(DenseDilated, self).__init__()
        self.dilation = dilation
        self.stochastic = stochastic
        self.epsilon = epsilon
        self.k = k

    def forward(self, edge_index):
        if self.stochastic:
            if torch.rand(1) < self.epsilon and self.training:
                num = self.k * self.dilation
                randnum = torch.randperm(num)[:self.k]
                edge_index = edge_index[:, :, :, randnum]
            else:
                edge_index = edge_index[:, :, :, ::self.dilation]
        else:
            edge_index = edge_index[:, :, :, ::self.dilation]
        return edge_index

In [11]:
class DenseDilatedKnnGraph(nn.Module):
    """
    Find the neighbors' indices based on dilated knn
    """
    def __init__(self, k=9, dilation=1, stochastic=False, epsilon=0.0):
        super(DenseDilatedKnnGraph, self).__init__()
        self.dilation = dilation
        self.stochastic = stochastic
        self.epsilon = epsilon
        self.k = k
        self._dilated = DenseDilated(k, dilation, stochastic, epsilon)

    def forward(self, x, y=None, relative_pos=None):
        if y is not None:
            #### normalize
            x = F.normalize(x, p=2.0, dim=1)
            y = F.normalize(y, p=2.0, dim=1)
            ####
            edge_index = xy_dense_knn_matrix(x, y, self.k * self.dilation, relative_pos)
        else:
            #### normalize
            x = F.normalize(x, p=2.0, dim=1)
            ####
            edge_index = dense_knn_matrix(x, self.k * self.dilation, relative_pos)
        return self._dilated(edge_index)


In [12]:
class BasicConv(Seq):
    def __init__(self, channels, act='relu', norm=None, bias=True, drop=0.):
        m = []
        for i in range(1, len(channels)):
            m.append(Conv2d(channels[i - 1], channels[i], 1, bias=bias, groups=4))
            if norm is not None and norm.lower() != 'none':
                m.append(norm_layer(norm, channels[-1]))
            if act is not None and act.lower() != 'none':
                m.append(act_layer(act))
            if drop > 0:
                m.append(nn.Dropout2d(drop))

        super(BasicConv, self).__init__(*m)

        self.reset_parameters()

    def reset_parameters(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d) or isinstance(m, nn.InstanceNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()


In [13]:
def batched_index_select(x, idx):
    r"""fetches neighbors features from a given neighbor idx

    Args:
        x (Tensor): input feature Tensor
                :math:`\mathbf{X} \in \mathbb{R}^{B \times C \times N \times 1}`.
        idx (Tensor): edge_idx
                :math:`\mathbf{X} \in \mathbb{R}^{B \times N \times l}`.
    Returns:
        Tensor: output neighbors features
            :math:`\mathbf{X} \in \mathbb{R}^{B \times C \times N \times k}`.
    """
    batch_size, num_dims, num_vertices_reduced = x.shape[:3]
    _, num_vertices, k = idx.shape
    idx_base = torch.arange(0, batch_size, device=idx.device).view(-1, 1, 1) * num_vertices_reduced
    idx = idx + idx_base
    idx = idx.contiguous().view(-1)

    x = x.transpose(2, 1)
    feature = x.contiguous().view(batch_size * num_vertices_reduced, -1)[idx, :]
    feature = feature.view(batch_size, num_vertices, k, num_dims).permute(0, 3, 1, 2).contiguous()
    return feature


In [14]:
class MRConv2d(nn.Module):
    """
    Max-Relative Graph Convolution (Paper: https://arxiv.org/abs/1904.03751) for dense data type
    """
    def __init__(self, in_channels, out_channels, act='relu', norm=None, bias=True):
        super(MRConv2d, self).__init__()
        self.nn = BasicConv([in_channels*2, out_channels], act, norm, bias)

    def forward(self, x, edge_index, y=None):
        x_i = batched_index_select(x, edge_index[1])
        if y is not None:
            x_j = batched_index_select(y, edge_index[0])
        else:
            x_j = batched_index_select(x, edge_index[0])
        x_j, _ = torch.max(x_j - x_i, -1, keepdim=True)
        b, c, n, _ = x.shape
        x = torch.cat([x.unsqueeze(2), x_j.unsqueeze(2)], dim=2).reshape(b, 2 * c, n, _)
        return self.nn(x)

In [15]:
class EdgeConv2d(nn.Module):
    """
    Edge convolution layer (with activation, batch normalization) for dense data type
    """
    def __init__(self, in_channels, out_channels, act='relu', norm=None, bias=True):
        super(EdgeConv2d, self).__init__()
        self.nn = BasicConv([in_channels * 2, out_channels], act, norm, bias)

    def forward(self, x, edge_index, y=None):
        x_i = batched_index_select(x, edge_index[1])
        if y is not None:
            x_j = batched_index_select(y, edge_index[0])
        else:
            x_j = batched_index_select(x, edge_index[0])
        max_value, _ = torch.max(self.nn(torch.cat([x_i, x_j - x_i], dim=1)), -1, keepdim=True)
        return max_value


In [16]:
class GraphSAGE(nn.Module):
    """
    GraphSAGE Graph Convolution (Paper: https://arxiv.org/abs/1706.02216) for dense data type
    """
    def __init__(self, in_channels, out_channels, act='relu', norm=None, bias=True):
        super(GraphSAGE, self).__init__()
        self.nn1 = BasicConv([in_channels, in_channels], act, norm, bias)
        self.nn2 = BasicConv([in_channels*2, out_channels], act, norm, bias)

    def forward(self, x, edge_index, y=None):
        if y is not None:
            x_j = batched_index_select(y, edge_index[0])
        else:
            x_j = batched_index_select(x, edge_index[0])
        x_j, _ = torch.max(self.nn1(x_j), -1, keepdim=True)
        return self.nn2(torch.cat([x, x_j], dim=1))

In [17]:
class GINConv2d(nn.Module):
    """
    GIN Graph Convolution (Paper: https://arxiv.org/abs/1810.00826) for dense data type
    """
    def __init__(self, in_channels, out_channels, act='relu', norm=None, bias=True):
        super(GINConv2d, self).__init__()
        self.nn = BasicConv([in_channels, out_channels], act, norm, bias)
        eps_init = 0.0
        self.eps = nn.Parameter(torch.Tensor([eps_init]))

    def forward(self, x, edge_index, y=None):
        if y is not None:
            x_j = batched_index_select(y, edge_index[0])
        else:
            x_j = batched_index_select(x, edge_index[0])
        x_j = torch.sum(x_j, -1, keepdim=True)
        return self.nn((1 + self.eps) * x + x_j)

In [18]:
class GraphConv2d(nn.Module):
    """
    Static graph convolution layer
    """
    def __init__(self, in_channels, out_channels, conv='edge', act='relu', norm=None, bias=True):
        super(GraphConv2d, self).__init__()
        if conv == 'edge':
            self.gconv = EdgeConv2d(in_channels, out_channels, act, norm, bias)
        elif conv == 'mr':
            self.gconv = MRConv2d(in_channels, out_channels, act, norm, bias)
        elif conv == 'sage':
            self.gconv = GraphSAGE(in_channels, out_channels, act, norm, bias)
        elif conv == 'gin':
            self.gconv = GINConv2d(in_channels, out_channels, act, norm, bias)
        else:
            raise NotImplementedError('conv:{} is not supported'.format(conv))

    def forward(self, x, edge_index, y=None):
        return self.gconv(x, edge_index, y)

In [19]:
class DyGraphConv2d(GraphConv2d):
    """
    Dynamic graph convolution layer
    """
    def __init__(self, in_channels, out_channels, kernel_size=9, dilation=1, conv='edge', act='relu',
                 norm=None, bias=True, stochastic=False, epsilon=0.0, r=1):
        super(DyGraphConv2d, self).__init__(in_channels, out_channels, conv, act, norm, bias)
        self.k = kernel_size
        self.d = dilation
        self.r = r
        self.dilated_knn_graph = DenseDilatedKnnGraph(kernel_size, dilation, stochastic, epsilon)

    def forward(self, x, relative_pos=None):
        B, C, H, W = x.shape
        y = None
        if self.r > 1:
            y = F.avg_pool2d(x, self.r, self.r)
            y = y.reshape(B, C, -1, 1).contiguous()
        x = x.reshape(B, C, -1, 1).contiguous()
        edge_index = self.dilated_knn_graph(x, y, relative_pos)
        x = super(DyGraphConv2d, self).forward(x, edge_index, y)
        return x.reshape(B, -1, H, W).contiguous()

In [20]:
class Grapher(nn.Module):
    """
    Grapher module with graph convolution and fc layers
    """
    def __init__(self, in_channels, kernel_size=9, dilation=1, conv='edge', act='relu', norm=None,
                 bias=True,  stochastic=False, epsilon=0.0, r=1, n=196, drop_path=0.0, relative_pos=False):
        super(Grapher, self).__init__()
        self.channels = in_channels
        self.n = n
        self.r = r
        self.fc1 = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, 1, stride=1, padding=0),
            nn.BatchNorm2d(in_channels),
        )
        self.graph_conv = DyGraphConv2d(in_channels, in_channels * 2, kernel_size, dilation, conv,
                              act, norm, bias, stochastic, epsilon, r)
        self.fc2 = nn.Sequential(
            nn.Conv2d(in_channels * 2, in_channels, 1, stride=1, padding=0),
            nn.BatchNorm2d(in_channels),
        )
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.relative_pos = None
        if relative_pos:
            print('using relative_pos')
            relative_pos_tensor = torch.from_numpy(np.float32(get_2d_relative_pos_embed(in_channels,
                int(n**0.5)))).unsqueeze(0).unsqueeze(1)
            relative_pos_tensor = F.interpolate(
                    relative_pos_tensor, size=(n, n//(r*r)), mode='bicubic', align_corners=False)
            self.relative_pos = nn.Parameter(-relative_pos_tensor.squeeze(1), requires_grad=False)

    def _get_relative_pos(self, relative_pos, H, W):
        if relative_pos is None or H * W == self.n:
            return relative_pos
        else:
            N = H * W
            N_reduced = N // (self.r * self.r)
            return F.interpolate(relative_pos.unsqueeze(0), size=(N, N_reduced), mode="bicubic").squeeze(0)

    def forward(self, x):
        _tmp = x
        x = self.fc1(x)
        B, C, H, W = x.shape
        relative_pos = self._get_relative_pos(self.relative_pos, H, W)
        x = self.graph_conv(x, relative_pos)
        x = self.fc2(x)
        x = self.drop_path(x) + _tmp
        return x


In [21]:
class Downsample(nn.Module):
    """ Convolution-based downsample
    """
    def __init__(self, in_dim=3, out_dim=768):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_dim, out_dim, 3, stride=2, padding=1),
            nn.BatchNorm2d(out_dim),
        )

    def forward(self, x):
        x = self.conv(x)
        return x

In [22]:
class FFN(nn.Module):
    def __init__(self, in_features, hidden_features=None, out_features=None, act='relu', drop_path=0.0):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        self.fc1 = nn.Sequential(
            nn.Conv2d(in_features, hidden_features, 1, stride=1, padding=0),
            nn.BatchNorm2d(hidden_features),
        )
        self.act = act_layer(act)
        self.fc2 = nn.Sequential(
            nn.Conv2d(hidden_features, out_features, 1, stride=1, padding=0),
            nn.BatchNorm2d(out_features),
        )
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()

    def forward(self, x):
        shortcut = x
        x = self.fc1(x)
        x = self.act(x)
        x = self.fc2(x)
        x = self.drop_path(x) + shortcut
        return x#.reshape(B, C, N, 1)



In [23]:
class Stem(nn.Module):
    """ Image to Visual Embedding
    Overlap: https://arxiv.org/pdf/2106.13797.pdf
    """
    def __init__(self, img_size=224, in_dim=3, out_dim=768, act='relu'):
        super().__init__()
        self.convs = nn.Sequential(
            nn.Conv2d(in_dim, out_dim//2, 3, stride=2, padding=1),
            nn.BatchNorm2d(out_dim//2),
            act_layer(act),
            nn.Conv2d(out_dim//2, out_dim, 3, stride=2, padding=1),
            nn.BatchNorm2d(out_dim),
            act_layer(act),
            nn.Conv2d(out_dim, out_dim, 3, stride=1, padding=1),
            nn.BatchNorm2d(out_dim),
        )

    def forward(self, x):
        x = self.convs(x)
        return x


In [24]:
class DeepGCN(torch.nn.Module):
    def __init__(self, opt):
        super(DeepGCN, self).__init__()
        print(opt)
        k = opt.k
        act = opt.act
        norm = opt.norm
        bias = opt.bias
        epsilon = opt.epsilon
        stochastic = opt.use_stochastic
        conv = opt.conv
        emb_dims = opt.emb_dims
        drop_path = opt.drop_path

        blocks = opt.blocks
        self.n_blocks = sum(blocks)
        channels = opt.channels
        reduce_ratios = [4, 2, 1, 1]
        dpr = [x.item() for x in torch.linspace(0, drop_path, self.n_blocks)]  # stochastic depth decay rule
        num_knn = [int(x.item()) for x in torch.linspace(k, k, self.n_blocks)]  # number of knn's k
        max_dilation = 49 // max(num_knn)

        self.stem = Stem(out_dim=channels[0], act=act)
        self.pos_embed = nn.Parameter(torch.zeros(1, channels[0], 224//4, 224//4))
        HW = 224 // 4 * 224 // 4

        self.backbone = nn.ModuleList([])
        idx = 0
        for i in range(len(blocks)):
            if i > 0:
                self.backbone.append(Downsample(channels[i-1], channels[i]))
                HW = HW // 4
            for j in range(blocks[i]):
                self.backbone += [
                    Seq(Grapher(channels[i], num_knn[idx], min(idx // 4 + 1, max_dilation), conv, act, norm,
                                    bias, stochastic, epsilon, reduce_ratios[i], n=HW, drop_path=dpr[idx],
                                    relative_pos=True),
                          FFN(channels[i], channels[i] * 4, act=act, drop_path=dpr[idx])
                         )]
                idx += 1
        self.backbone = Seq(*self.backbone)

        self.prediction = Seq(nn.Conv2d(channels[-1], 1024, 1, bias=True),
                              nn.BatchNorm2d(1024),
                              act_layer(act),
                              nn.Dropout(opt.dropout),
                              # nn.Conv2d(1024, 512, 1, bias=True),
                              # nn.BatchNorm2d(512),
                              # act_layer(act),
                              # nn.Dropout(opt.dropout),
                              # nn.Conv2d(512, 256, 1, bias=True),
                              # nn.BatchNorm2d(256),
                              # act_layer(act),
                              # nn.Dropout(opt.dropout),
                              nn.Conv2d(1024, opt.n_classes, 1, bias=True))
        self.model_init()

    def model_init(self):
        for m in self.modules():
            if isinstance(m, torch.nn.Conv2d):
                torch.nn.init.kaiming_normal_(m.weight)
                m.weight.requires_grad = True
                if m.bias is not None:
                    m.bias.data.zero_()
                    m.bias.requires_grad = True

    def forward(self, inputs):
        x = self.stem(inputs) + self.pos_embed
        B, C, H, W = x.shape
        for i in range(len(self.backbone)):
            x = self.backbone[i](x)

        x = F.adaptive_avg_pool2d(x, 1)
        return self.prediction(x).squeeze(-1).squeeze(-1)

#MOdel Define

In [25]:
def _cfg(url='', **kwargs):
    return {
        'url': url,
        'num_classes': 7, 'input_size': (3, 224, 224), 'pool_size': None,
        'crop_pct': .9, 'interpolation': 'bicubic',
        'mean': IMAGENET_DEFAULT_MEAN, 'std': IMAGENET_DEFAULT_STD,
        'first_conv': 'patch_embed.proj', 'classifier': 'head',
        **kwargs
    }

default_cfgs = {
    'vig_224_gelu': _cfg(
        mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5),
    ),
    'vig_b_224_gelu': _cfg(
        crop_pct=0.95, mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5),
    ),
}


@register_model
def pvig_s_224_gelu(pretrained=False, **kwargs):
    class OptInit:
        def __init__(self, num_classes=7, drop_path_rate=0.0, **kwargs):
            self.k = 9 # neighbor num (default:9)
            self.conv = 'mr' # graph conv layer {edge, mr}
            self.act = 'gelu' # activation layer {relu, prelu, leakyrelu, gelu, hswish}
            self.norm = 'batch' # batch or instance normalization {batch, instance}
            self.bias = True # bias of conv layer True or False
            self.dropout = 0.0 # dropout rate
            self.use_dilation = True # use dilated knn or not
            self.epsilon = 0.2 # stochastic epsilon for gcn
            self.use_stochastic = False # stochastic for gcn, True or False
            self.drop_path = drop_path_rate
            self.blocks = [2,2,6,2] # number of basic blocks in the backbone
            self.channels = [80, 160, 400, 640] # number of channels of deep features
            self.n_classes = num_classes # Dimension of out_channels
            self.emb_dims = 1024 # Dimension of embeddings

    opt = OptInit(**kwargs)
    model = DeepGCN(opt)
    model.default_cfg = default_cfgs['vig_224_gelu']
    return model

In [26]:
torch.manual_seed(42)


<torch._C.Generator at 0x7c952aed5670>

In [27]:
folder_path  = '/content/drive/MyDrive/emotion/'
date = date.today()
results_main_path = os.path.join(folder_path,'{}'.format(date))

try:
    os.mkdir(results_main_path)
    print('[Success] Created folder in results_main_path')
except Exception as e:
    print(e)

[Errno 17] File exists: '/content/drive/MyDrive/emotion/2023-09-13'


In [28]:
model = create_model(
        'pvig_s_224_gelu',
        pretrained=False,
        num_classes=7,
        drop_rate=0.0,
        drop_connect_rate=None,  # DEPRECATED, use drop_path
        drop_path_rate=None,
        drop_block_rate=None,
        global_pool=None,
        bn_tf=None,
        bn_momentum=0.9,
        bn_eps=None,
        checkpoint_path='')

<__main__.pvig_s_224_gelu.<locals>.OptInit object at 0x7c946818b8b0>
using relative_pos
using relative_pos


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  omega = np.arange(embed_dim // 2, dtype=np.float)


using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos
using relative_pos


In [29]:
input_size = [1, 3, 224, 224]
input = torch.randn(input_size)#.cuda()

In [30]:
model.eval()
#macs = profile_macs(model, input)
model.train()

DeepGCN(
  (stem): Stem(
    (convs): Sequential(
      (0): Conv2d(3, 40, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (1): BatchNorm2d(40, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): GELU(approximate='none')
      (3): Conv2d(40, 80, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
      (4): BatchNorm2d(80, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): GELU(approximate='none')
      (6): Conv2d(80, 80, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (7): BatchNorm2d(80, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
  )
  (backbone): Sequential(
    (0): Sequential(
      (0): Grapher(
        (fc1): Sequential(
          (0): Conv2d(80, 80, kernel_size=(1, 1), stride=(1, 1))
          (1): BatchNorm2d(80, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        (graph_conv): DyGraphConv2d(
          (gconv): MRConv2d(
            (nn): BasicConv(
      

In [31]:
args = SimpleNamespace()
args.weight_decay = 0.05
args.lr = 2e-3
args.opt = 'adamw' #'lookahead_adam' to use `lookahead`
args.momentum = 0.9
args.sched='cosine'
args.epochs=100
args.min_lr=1e-5
args.warmup_lr=1e-2
args.warmup_epochs=5
args.cooldown_epochs=3
args.drop_path=0.1
args.b=32
args.output=''
args.opt_eps=1e-8 #epsilon vlaue fo optimizer
args.mixup=0.8
args.cutmix=1.0
args.model_ema=True
args.model_ema_decay=0.99996
args.aa='rand-m9-mstd0.5-inc1'  #Use AutoAugment policy
args.color_jitter=0.4 #Color jitter factor
args.repeated_aug=True
args.remode='pixel' #Random erase mode
args.reprob=0.25  #Random erase prob
args.amp=False #NVIDIA Pex
args.eval_metric='top1'
args.local_rank=0
args.model='vig_s_224_gelu'


In [32]:
optimizer = create_optimizer(args, model)
lr_scheduler, num_epochs = create_scheduler(args, optimizer)

#data loading and training

In [33]:
data_path  = '/content/drive/MyDrive/emotion/'
data_set_path='/content/drive/MyDrive/emotion/kdef_dataset'
# train_path=os.path.join(data_path,'phishIRIS/train')
# test_path=os.path.join(data_path,'phishIRIS/val')

In [34]:


# transform=transforms.Compose([transforms.ToTensor(),transforms.Resize(size=(224,224),antialias=True)])

# trainSet=torchvision.datasets.ImageFolder(root=data_path, split='train' transform=transform)
# trainLoader=torch.utils.data.DataLoader(trainSet, batch_size=batch_size,shuffle=True,num_workers=2)

# testSet=torchvision.datasets.STL10(root='./data', split='test', download=True, transform=transform)

# testLoader=torch.utils.data.DataLoader(testSet, batch_size=batch_size,shuffle=True,num_workers=2)

In [35]:
def train_val_dataset(dataset, val_split=0.25):
    train_idx, val_idx = train_test_split(list(range(len(dataset))), test_size=val_split)
    datasets = {}
    datasets['train'] = Subset(dataset, train_idx)
    datasets['val'] = Subset(dataset, val_idx)
    return datasets

In [36]:


# batch_size=args.b
# # trainSet=ImageDataset(train_path)
# # testset=ImageDataset(test_path)
# transform=transforms.Compose([transforms.ToTensor(),transforms.Resize(size=(224,224),antialias=True),transforms.RandomHorizontalFlip(p=0.5),transforms.RandomVerticalFlip(p=0.6)])

# trainSet=torchvision.datasets.ImageFolder(root=train_path,  transform=transform)
# testSet=torchvision.datasets.ImageFolder(root=test_path, transform=transform)

# trainLoader=torch.utils.data.DataLoader(trainSet, batch_size=batch_size,shuffle=True,num_workers=1)
# testLoader=torch.utils.data.DataLoader(testSet, batch_size=batch_size,shuffle=True,num_workers=1)

In [37]:

batch_size=args.b
# trainSet=ImageDataset(train_path)
# testset=ImageDataset(test_path)
transform=transforms.Compose([transforms.ToTensor(),transforms.Resize(size=(224,224),antialias=True)])

dataSet=torchvision.datasets.ImageFolder(root=data_set_path,  transform=transform)

print(len(dataSet))
dataSets = train_val_dataset(dataSet)
print(len(dataSets['train']))
print(len(dataSets['val']))
# The original dataset is available in the Subset class
print(dataSets['train'].dataset)

dataloaders = {x:torch.utils.data.DataLoader(dataSets[x],batch_size, shuffle=True, num_workers=4) for x in ['train','val']}
x,y = next(iter(dataloaders['train']))
print(x.shape, y.shape)
#dataLoader=torch.utils.data.DataLoader(dataSet, batch_size=batch_size,shuffle=True,num_workers=1)



3588
2691
897
Dataset ImageFolder
    Number of datapoints: 3588
    Root location: /content/drive/MyDrive/emotion/kdef_dataset
    StandardTransform
Transform: Compose(
               ToTensor()
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
           )
torch.Size([32, 3, 224, 224]) torch.Size([32])


In [38]:
trainLoader=dataloaders['train']
testLoader=dataloaders['val']

In [39]:
print(len(trainLoader.dataset))
print(len(testLoader.dataset))


2691
897


In [40]:
train_features, train_labels = next(iter(trainLoader))
print(f"Feature batch shape: {train_features.size()}")
print(f"Labels batch shape: {train_labels.size()}")

Feature batch shape: torch.Size([32, 3, 224, 224])
Labels batch shape: torch.Size([32])


In [41]:
print(dataSet.class_to_idx)

{'AF': 0, 'AN': 1, 'DI': 2, 'HA': 3, 'NE': 4, 'SA': 5, 'SU': 6}


In [42]:
#print(testSet.class_to_idx)

In [43]:
if(torch.cuda.is_available()):
  print(torch.cuda.get_device_name(0))
  model.cuda()

Tesla T4


In [44]:


if(torch.cuda.is_available()):
  train_loss_fn = nn.CrossEntropyLoss().cuda()
  validate_loss_fn = nn.CrossEntropyLoss().cuda()
else:
  train_loss_fn = nn.CrossEntropyLoss()
  validate_loss_fn = nn.CrossEntropyLoss()

In [45]:
# model_ema = ModelEma(
#             model,
#             decay=args.model_ema_decay,
#             device='cpu' if args.model_ema_force_cpu else '',
#             resume=args.resume)


In [46]:
eval_metric = args.eval_metric
best_metric = None
best_epoch = None
saver = None
output_dir = results_main_path
output_dir=''
decreasing = True if eval_metric == 'loss' else False
saver = CheckpointSaver(
            model=model, optimizer=optimizer, args=args,
            checkpoint_dir=output_dir, recovery_dir=output_dir, decreasing=decreasing)

In [47]:
def train_epoch(
        epoch, model, loader, optimizer, loss_fn, args,
        lr_scheduler=None, saver=None, output_dir='',
        loss_scaler=None, model_ema=None, mixup_fn=None):
  #print("came")
  model.train()
  losses_m = AverageMeter()

  second_order = hasattr(optimizer, 'is_second_order') and optimizer.is_second_order
  num_updates = epoch * len(loader)
  last_idx = len(loader) - 1
  for batch_idx, (input, target) in enumerate(loader):
    last_batch = batch_idx == last_idx
    #print(last_batch)
    if(torch.cuda.is_available()):
      input, target = input.cuda(), target.cuda()

    output = model(input)
    input, target = input.cpu(), target.cpu()
    output=output.cpu()

    loss = loss_fn(output, target)
    losses_m.update(loss.item(), input.size(0))
    optimizer.zero_grad()
    loss.backward(create_graph=second_order)
    optimizer.step()
    lr_scheduler.step_update(num_updates=num_updates, metric=losses_m.avg)
    if last_batch:
      lrl = [param_group['lr'] for param_group in optimizer.param_groups]
      lr = sum(lrl) / len(lrl)

      print(
                    'Train: {} [{:>4d}/{} ({:>3.0f}%)]  '
                    'Loss: {loss.val:>9.6f} ({loss.avg:>6.4f})  '
                    'LR: {lr:.3e}  '.format(
                        epoch,
                        batch_idx, len(loader),
                        100. * batch_idx / last_idx,
                        loss=losses_m,
                        lr=lr))

  return OrderedDict([('loss', losses_m.avg)])

In [48]:
def validate(model, loader, loss_fn):

    losses_m = AverageMeter()
    top1_m = AverageMeter()
    top5_m = AverageMeter()

    model.eval()

    last_idx = len(loader) - 1
    with torch.no_grad():
        for batch_idx, (input, target) in enumerate(loader):
            last_batch = batch_idx == last_idx
            if(torch.cuda.is_available()):
              input, target = input.cuda(), target.cuda()

            output = model(input)

            if isinstance(output, (tuple, list)):
                output = output[0]

            loss = loss_fn(output, target)
            acc1, acc5 = accuracy(output, target, topk=(1, 5))

            reduced_loss = loss.data

            losses_m.update(reduced_loss.item(), input.size(0))
            top1_m.update(acc1.item(), output.size(0))
            top5_m.update(acc5.item(), output.size(0))

            if last_batch:
              print('Loss: {loss.val:>7.4f} ({loss.avg:>6.4f})  '
                    'Acc@1: {top1.val:>7.4f} ({top1.avg:>7.4f})  '
                    'Acc@5: {top5.val:>7.4f} ({top5.avg:>7.4f})'.format(
                        loss=losses_m, top1=top1_m, top5=top5_m))

    metrics = OrderedDict([('loss', losses_m.avg), ('top1', top1_m.avg), ('top5', top5_m.avg)])

    return metrics

In [50]:
train_losses = []
valid_losses = []
top1_accuracies = []


for epoch in range(0, num_epochs):
    # if args.distributed:
    #   loader_train.sampler.set_epoch(epoch)
  train_metrics = train_epoch(epoch, model, trainLoader, optimizer, train_loss_fn, args,
                lr_scheduler=lr_scheduler)

  eval_metrics = validate(model, testLoader, validate_loss_fn)

  train_losses.append(train_metrics['loss'])
  valid_losses.append(eval_metrics['loss'])

  top1_accuracies.append(eval_metrics['top1'])



  lr_scheduler.step(epoch + 1, eval_metrics[eval_metric])
  update_summary(
                epoch, train_metrics, eval_metrics, os.path.join(output_dir, 'summary.csv'),
                write_header=best_metric is None)


                # save proper checkpoint with eval metric
  save_metric = eval_metrics[eval_metric]
  best_metric, best_epoch = saver.save_checkpoint(epoch, metric=save_metric)


Loss:  1.9011 (2.1855)  Acc@1:  0.0000 (15.2731)  Acc@5: 100.0000 (73.3556)
Loss:  2.2310 (3.2590)  Acc@1:  0.0000 (16.0535)  Acc@5: 100.0000 (69.5652)
Loss:  1.8128 (2.7816)  Acc@1:  0.0000 (15.2731)  Acc@5: 100.0000 (75.1394)
Loss:  1.9699 (2.1504)  Acc@1:  0.0000 (14.6042)  Acc@5: 100.0000 (73.3556)
Loss:  2.0731 (2.3054)  Acc@1:  0.0000 (15.4961)  Acc@5: 100.0000 (72.0178)
Loss:  1.9760 (2.1479)  Acc@1:  0.0000 (15.9420)  Acc@5:  0.0000 (72.4638)


KeyboardInterrupt: ignored

In [None]:
# from google.colab import files
# files.download('./model_best.pth.tar')

In [None]:
import matplotlib.pyplot as plt
num_epochs = num_epochs
def loss_curves(epochs, training_epoch_loss,valid_epoch_loss):
    plt.title('Mean Squared Error v Epochs', fontsize=20)
    plt.xlabel('No. of Epochs', fontsize=18)
    plt.ylabel('Mean Squared Error', fontsize=16)

    plt.plot(epochs, training_epoch_loss,label="Training Loss")
    plt.plot(epochs, valid_epoch_loss,label="Validation Loss")
    plt.legend()
#     plt.savefig('Model_3_128_AFLw_Test_ds.jpg')

def accuracy_curves(epochs, val_accuracy):
    plt.title('Validation Accuracy v Epochs', fontsize=20)
    plt.xlabel('No. of Epochs', fontsize=18)
    plt.ylabel('Validation Accuracy', fontsize=16)

    plt.plot(epochs, val_accuracy,label="Validation Accuracy")

    plt.legend()
#     plt.savefig('Model_3_128_AFLw_Test_ds.jpg')



In [None]:
loss_curves(np.linspace(0, num_epochs, num_epochs).astype(int),train_losses,valid_losses)

In [None]:
accuracy_curves(np.linspace(0, num_epochs, num_epochs).astype(int),top1_accuracies)

In [None]:
model.eval()

In [None]:
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow


In [None]:
def testModel(model,loader,load_model_path):

  #checkpoint = torch.load(load_model_path)
  #model.load_state_dict(checkpoint['state_dict'])
  #model.load_state_dict(checkpoint)
  #optimizer.load_state_dict(checkpoint['optimizer'])
  model.eval()

  # for batch_idx, (input, target) in enumerate(loader):
  #   print(batch_idx)
  #   if(torch.cuda.is_available()):
  #     input, target = input.cuda(), target.cuda()

  #   output = model(input)

  #   if isinstance(output, (tuple, list)):
  #     output = output[0]
  #   print(output)

  subset_indices = [700] # select your indices here as a list
  subset = torch.utils.data.Subset(loader, subset_indices)
  testloader_subset = torch.utils.data.DataLoader(subset, batch_size=1, num_workers=0, shuffle=False)

  input, target = next(iter(testloader_subset))
  plt.imshow(input[0].permute(1,2,0),interpolation='none')
  plt.show()

  if(torch.cuda.is_available()):
    input, target = input.cuda(), target.cuda()





  output = model(input)

  if isinstance(output, (tuple, list)):
    output = output[0]
  print(output)
  print("Predicted label: ",torch.argmax(output).item())
  print("Real Label: ",target.item())
  print(loader.class_to_idx)


In [None]:
#load_model_path=os.path.join(data_path,"2023-05-12","model_saved.pt")
#load_model_path='./last.pth.tar'
load_model_path=os.path.join("model_best.pth.tar")

checkpoint = torch.load(load_model_path)

torch.save(model.state_dict(),os.path.join(results_main_path,"model_saved.pt"))
#model.load_state_dict(checkpoint)
model.load_state_dict(checkpoint['state_dict'])

optimizer.load_state_dict(checkpoint['optimizer'])
model.eval()

#testModel(model,dataSet,load_model_path)

In [None]:
model.cpu()

In [None]:
from sklearn.metrics import confusion_matrix,accuracy_score
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

# iterate over test data
for input, target in testLoader:
  # if(torch.cuda.is_available()):
  #   input, target = input.cuda(), target.cuda()

  output = model(input)


  #print(output)
  #output = torch.argmax(output).data.cpu().numpy()
  #input.cpu()
  output = (torch.max(output, 1)[1]).data.numpy()
  y_pred.extend(output) # Save Prediction
  #print(y_pred)
  labels = target.data.cpu().numpy()

  y_true.extend(labels) # Save Truth
  #print(y_true)
  #print(y_true)
# constant for classes
classes = ('Benign','Phishing')

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)

acc=accuracy_score(y_pred,y_true)
print(acc)


In [None]:

df_cm = pd.DataFrame(cf_matrix / np.sum(cf_matrix, axis=1)[:, None], index = [i for i in classes],
                     columns = [i for i in classes])
plt.figure(figsize = (12,7))
sn.heatmap(df_cm, annot=True)
plt.savefig('output.png')

In [None]:

TN = cf_matrix[0][0]
FN = cf_matrix[1][0]
TP = cf_matrix[1][1]
FP = cf_matrix[0][1]

print(TN,FN,TP,FP)

In [None]:

plt.figure(figsize = (12,7))
plt.show()

In [None]:
plt.show()

In [None]:
plt.savefig('output.png')

In [None]:
torch.cuda.empty_cache()

In [None]:
from prettytable import PrettyTable

def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad: continue
        param = parameter.numel()
        table.add_row([name, param])
        total_params+=param
    print(table)
    print(f"Total Trainable Params: {total_params}")
    return total_params

count_parameters(model)

In [None]:
pip install torchvision

In [None]:
from torch.utils.mobile_optimizer import optimize_for_mobile

In [None]:
example = torch.randn(64,3,224,224)

traced_script_module = torch.jit.trace(model, example)


traced_script_module_optimized = optimize_for_mobile(traced_script_module)



In [None]:
traced_script_module.save(os.path.join(results_main_path,"model_jit.pt"))

In [None]:
traced_script_module_optimized._save_for_lite_interpreter(os.path.join(results_main_path,"model.ptl"))

In [None]:
traced_script_module_optimized=torch.jit.load(os.path.join(results_main_path,"model.ptl"))

In [None]:
from sklearn.metrics import confusion_matrix,accuracy_score
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

# iterate over test data
for input, target in testLoader:
  # if(torch.cuda.is_available()):
  #   input, target = input.cuda(), target.cuda()

  output = traced_script_module_optimized(input)


  #print(output)
  #output = torch.argmax(output).data.cpu().numpy()
  #input.cpu()
  output = (torch.max(output, 1)[1]).data.numpy()
  y_pred.extend(output) # Save Prediction
  #print(y_pred)
  labels = target.data.cpu().numpy()

  y_true.extend(labels) # Save Truth
  #print(y_true)
  #print(y_true)
# constant for classes
classes = ('Benign','Phishing')

# Build confusion matrix
cf_matrix = confusion_matrix(y_true, y_pred)

acc=accuracy_score(y_pred,y_true)
print(acc)

In [None]:
def testModel(model,loader,load_model_path,indice):

  #checkpoint = torch.load(load_model_path)
  #model.load_state_dict(checkpoint['state_dict'])
  #model.load_state_dict(checkpoint)
  #optimizer.load_state_dict(checkpoint['optimizer'])
  #model.eval()

  # for batch_idx, (input, target) in enumerate(loader):
  #   print(batch_idx)
  #   if(torch.cuda.is_available()):
  #     input, target = input.cuda(), target.cuda()

  #   output = model(input)

  #   if isinstance(output, (tuple, list)):
  #     output = output[0]
  #   print(output)

  subset_indices = [indice] # select your indices here as a list
  subset = torch.utils.data.Subset(loader, subset_indices)
  testloader_subset = torch.utils.data.DataLoader(subset, batch_size=1, num_workers=0, shuffle=False)

  input, target = next(iter(testloader_subset))
  plt.imshow(input[0].permute(1,2,0),interpolation='none')
  plt.show()

  # if(torch.cuda.is_available()):
  #   input, target = input.cuda(), target.cuda()



  print(type(input))
  output = model(input)

  if isinstance(output, (tuple, list)):
    output = output[0]
  print(output)
  print("Predicted label: ",torch.argmax(output).item())
  print("Real Label: ",target.item())
  print(loader.class_to_idx)

In [None]:
testModel(traced_script_module_optimized,dataSet,load_model_path,2000)

In [None]:
import torch
import os

from torchvision import transforms
from PIL import Image

In [None]:
path="benign_test"
name='00000.PNG'