In [1]:
import numpy as np
from collections import OrderedDict

import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

from torch.cuda.amp import autocast, GradScaler
from torch.nn.utils import clip_grad_norm_
from tqdm import tqdm
import time

from timm.data import Mixup
import transformers

from einops import rearrange
from einops.layers.torch import Rearrange

from timm.models.layers import DropPath, trunc_normal_, to_2tuple

from sklearn.metrics import confusion_matrix
import pandas as pd
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

  from .autonotebook import tqdm as notebook_tqdm


# 01.Convolutional Token Embedding

In [2]:
class ConvEmbed(nn.Module):
    '''
    img/token map to Conv Embedding
    '''
    
    def __init__(self,
                 patch_size=11, # [11, 7, 7, 3]
                 in_chans=3,   # [3, dim of stage1, dim of stage2]
                 embed_dim=64, # [32, 64, 192, 384]
                 stride=4,     # [6, 4, 4, 2]
                 padding=2,    # [3, 2, 2, 1]
                 norm_layer=None):
        super().__init__()
        self.patch_size = to_2tuple(patch_size)
        
        self.proj = nn.Conv2d(
            in_channels=in_chans,
            out_channels=embed_dim,
            kernel_size=patch_size,
            stride=stride,
            padding=padding
        )
        
        self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity()
        
    def forward(self, x):
        x = self.proj(x)
        
        _, _, H, W = x.shape
        x = rearrange(x, 'b c h w -> b (h w) c')
        x = self.norm(x)
        x = rearrange(x, 'b (h w) c -> b c h w', h=H, w=W)
        return x
    

In [3]:
class LayerNorm(nn.Module):
    r""" LayerNorm that supports two data formats: channels_last (default) or channels_first. 
    The ordering of the dimensions in the inputs. channels_last corresponds to inputs with 
    shape (batch_size, height, width, channels) while channels_first corresponds to inputs 
    with shape (batch_size, channels, height, width).
    """
    def __init__(self, normalized_shape, eps=1e-6, data_format="channels_last"):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(normalized_shape))
        self.bias = nn.Parameter(torch.zeros(normalized_shape))
        self.eps = eps
        self.data_format = data_format
        if self.data_format not in ["channels_last", "channels_first"]:
            raise NotImplementedError 
        self.normalized_shape = (normalized_shape, )
    
    def forward(self, x):
        if self.data_format == "channels_last":
            return F.layer_norm(x, self.normalized_shape, self.weight, self.bias, self.eps)
        elif self.data_format == "channels_first":
            u = x.mean(1, keepdim=True)
            s = (x - u).pow(2).mean(1, keepdim=True)
            x = (x - u) / torch.sqrt(s + self.eps)
            x = self.weight[:, None, None] * x + self.bias[:, None, None]
            return x

In [13]:
class AttentionConv(nn.Module):
    def __init__(self,
                 dim=64,        # [32,64,192,384]
                 num_heads=4,   # [1,3,6,9]
                 qkv_bias=False,
                 attn_drop=0.,
                 proj_drop=0.,
                 kernel_size=3,
                 padding_q=1,
                 padding_kv=1,
                 stride_q=1,
                 stride_kv=2,
                 **kwargs
                 ):
        super().__init__()
        self.stride_q = stride_q
        self.stride_kv = stride_kv
        self.dim = dim
        self.num_heads = num_heads        
        self.scale = dim ** -0.5
        
        self.conv_proj_q = self._build_projection(dim,
                                                  kernel_size,
                                                  padding_q,
                                                  stride_q,
                                                  )
        self.conv_proj_k = self._build_projection(dim,
                                                  kernel_size,
                                                  padding_kv,
                                                  stride_kv,
                                                  )
        
        self.conv_proj_v = self._build_projection(dim,
                                                  kernel_size,
                                                  padding_kv,
                                                  stride_kv,
                                                  )
        
        self.attn_drop = nn.Dropout(attn_drop)
        self.linear_proj_last = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)        
        
    def _build_projection(self,
                          dim,
                          kernel_size,
                          padding,
                          stride,
                          ):
        
        proj = nn.Sequential(OrderedDict([
            ('depthwise', nn.Conv2d(
                dim,
                dim,
                kernel_size=kernel_size,
                padding=padding,
                stride=stride,
                bias=False,
                groups=dim)),
            # ('rearrange', Rearrange('b c h w -> b (h w) c')),
            ('ln', nn.LayerNorm(dim))
            ('pointwise1', nn.Conv2d(
                dim,
                dim*4,
                kernel_size=1)),
            ('pointwise2', nn.Conv2d(
                dim*4,
                dim,
                kernel_size=1)),
            ('rearrange', Rearrange('b c h w -> b (h w) c')),

        ]))
        
        return proj
    
    def forward(self, x, h, w):
        x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w)
        
        q = self.conv_proj_q(x)
        k = self.conv_proj_k(x)
        v = self.conv_proj_v(x)
        
        q = rearrange(q, 'b t (h d) -> b h t d', h=self.num_heads)
        k = rearrange(k, 'b t (h d) -> b h t d', h=self.num_heads)
        v = rearrange(v, 'b t (h d) -> b h t d', h=self.num_heads)
        
        attn_score = torch.einsum('bhlk,bhtk->bhlt', [q, k]) * self.scale
        attn = self.attn_drop(F.softmax(attn_score, dim=-1))
        
        x = torch.matmul(attn, v)
        batch_size, num_heads, seq_length, depth = x.size()
        x = x.view(batch_size, seq_length, num_heads * depth)
        
        x = self.proj_drop(self.linear_proj_last(x))
        
        return x


  ('ln', nn.LayerNorm(dim))


In [14]:
# transformer block에 작은 스케일 인자 곱하기
class LayerScale(nn.Module):
    def __init__(self, dim, init_values=1e-5):
        super().__init__()
        self.gamma = nn.Parameter(init_values * torch.ones((dim)))

    def forward(self, x):
        return self.gamma * x

In [15]:
class Block(nn.Module):
    
    def __init__(self,
                 dim,
                 num_heads,
                 mlp_ratio=4.,
                 qkv_bias=False,
                 drop=0.,
                 attn_drop=0.,
                 drop_path=0.,
                 act_layer=nn.GELU,
                 norm_layer=nn.LayerNorm,
                 **kwargs
                ):
        super().__init__()
        
        self.norm1 = norm_layer(dim)
        self.ls1 = LayerScale(dim)
        self.attn = AttentionConv(dim=dim,
                                  num_heads=num_heads,
                                  qkv_bias=qkv_bias,
                                  attn_drop=attn_drop,
                                  proj_drop=drop,
                                  **kwargs)        
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        self.ls2 = LayerScale(dim)
        mlp_hidden_dim = int(dim*mlp_ratio)
        self.mlp = nn.Sequential(
            nn.Linear(dim, mlp_hidden_dim),
            act_layer(),
            nn.Linear(mlp_hidden_dim, dim),
            nn.Dropout(drop),
        )
        
    def forward(self, x, h, w):
        res = x
        x = self.norm1(x)
        attn = self.attn(x, h, w)
        x = res + self.drop_path(self.ls1(attn))
        x = x + self.drop_path(self.ls2(self.mlp(self.norm2(x))))
        return x

In [16]:
test_img = torch.Tensor(np.zeros((2,3,224,224))) # B, C, H, W

block = Block(dim=64,
              num_heads=4)

  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))
  ('ln', nn.LayerNorm(dim))


TypeError: 'tuple' object is not callable

In [17]:
# Stage 1 

## Patch Embedding
convembed = ConvEmbed(patch_size=7, stride=4, padding=2)
stage1_img = convembed(test_img)

## Attention with Convolution
b, c, h, w = stage1_img.shape
stage1_img = rearrange(stage1_img, 'b c h w -> b (h w) c')
stage1_img = block(stage1_img, h=h, w=w)
stage1_img = rearrange(stage1_img, 'b (h w) c -> b c h w', h=h, w=w)

## Check Result
print(f'stage 1 | img shape: {test_img.shape} → Conv Embed Shape: {stage1_img.shape}')

RuntimeError: Given normalized_shape=[64], expected input with shape [*, 64], but got input of size[2, 3584, 56]

In [None]:
# Stage 2 

## Patch Embedding
convembed = ConvEmbed(patch_size=3, in_chans=64, stride=2, padding=1)
stage2_img = convembed(stage1_img)

## Attention with Convolution
b, c, h, w = stage2_img.shape
stage2_img = rearrange(stage2_img, 'b c h w -> b (h w) c')
stage2_img = block(stage2_img, h=h, w=w)
stage2_img = rearrange(stage2_img, 'b (h w) c -> b c h w', h=h, w=w)

## Check Result
print(f'stage 2 | img shape: {stage1_img.shape} → Conv Embed Shape: {stage2_img.shape}')

In [None]:
# Stage 3 

## Patch Embedding
convembed = ConvEmbed(patch_size=3, in_chans=64, stride=2, padding=1)
stage3_img = convembed(stage2_img)

## Attention with Convolution
b, c, h, w = stage3_img.shape
stage3_img = rearrange(stage3_img, 'b c h w -> b (h w) c')
stage3_img = block(stage3_img, h=h, w=w)
stage3_img = rearrange(stage3_img, 'b (h w) c -> b c h w', h=h, w=w)

## Check Result
print(f'stage 3 | img shape: {stage2_img.shape} → Conv Embed Shape: {stage3_img.shape}')

In [None]:
class VisionTransformer(nn.Module):
    def __init__(self,
                 patch_size=16,
                 patch_stride=16,
                 patch_padding=0,
                 in_chans=3,
                 embed_dim=768,
                 depth=12,
                 num_heads=12,
                 mlp_ratio=4.,
                 qkv_bias=False,
                 drop_rate=0.,
                 attn_drop_rate=0.,
                 drop_path_rate=0.,
                 act_layer=nn.GELU,
                 norm_layer=nn.LayerNorm,
                 init='trunc_norm',
                 **kwargs
                 ):
        
        super().__init__()

        self.patch_embed = ConvEmbed(
            patch_size=patch_size,
            in_chans=in_chans,
            stride=patch_stride,
            padding=patch_padding,
            embed_dim=embed_dim,
            norm_layer=norm_layer
        )

        self.pos_drop = nn.Dropout(p=drop_rate)

        self.blocks = nn.ModuleList([
            Block(
                dim=embed_dim,
                num_heads=num_heads,
                mlp_ratio=mlp_ratio,
                qkv_bias=qkv_bias,
                drop=drop_rate,
                attn_drop=attn_drop_rate,
                drop_path=drop_path_rate,
                act_layer=act_layer,
                norm_layer=norm_layer,
                **kwargs
            ) for _ in range(depth)
        ])

        if init == 'xavier':
            self.apply(self._init_weights_xavier)
        else:
            self.apply(self._init_weights_trunc_normal)

    def _init_weights_trunc_normal(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=0.02)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, (nn.LayerNorm, nn.BatchNorm2d)):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def _init_weights_xavier(self, m):
        if isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, (nn.LayerNorm, nn.BatchNorm2d)):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, x):
        x = self.patch_embed(x)
        _, _, H, W = x.size()

        x = rearrange(x, 'b c h w -> b (h w) c')
        x = self.pos_drop(x)

        for _, blk in enumerate(self.blocks):
            x = blk(x, H, W)
        x = rearrange(x, 'b (h w) c -> b c h w', h=H, w=W)
        return x

In [None]:
class ConvolutionalVisionTransformer(nn.Module):
    def __init__(self,
                 in_chans=3,
                 num_classes=100,
                 act_layer=nn.GELU,
                 norm_layer=nn.LayerNorm,
                 init='trunc_norm',
                 spec=None):
        super().__init__()
        self.num_classes = num_classes

        self.num_stages = spec['NUM_STAGES']
        self.stages = nn.ModuleList()
        for i in range(self.num_stages):
            kwargs = {
                'patch_size': spec['PATCH_SIZE'][i],
                'patch_stride': spec['PATCH_STRIDE'][i],
                'patch_padding': spec['PATCH_PADDING'][i],
                'embed_dim': spec['DIM_EMBED'][i],
                'depth': spec['DEPTH'][i],
                'num_heads': spec['NUM_HEADS'][i],
                'mlp_ratio': spec['MLP_RATIO'][i],
                'qkv_bias': spec['QKV_BIAS'][i],
                'drop_rate': spec['DROP_RATE'][i],
                'attn_drop_rate': spec['ATTN_DROP_RATE'][i],
                'drop_path_rate': spec['DROP_PATH_RATE'][i],
                'kernel_size': spec['KERNEL_QKV'][i],
                'padding_q': spec['PADDING_Q'][i],
                'padding_kv': spec['PADDING_KV'][i],
                'stride_q': spec['STRIDE_Q'][i],
                'stride_kv': spec['STRIDE_KV'][i],
            }

            stage = VisionTransformer(
                in_chans=in_chans,
                init=init,
                act_layer=act_layer,
                norm_layer=norm_layer,
                **kwargs
            )
            
            self.stages.append(stage)

            in_chans = spec['DIM_EMBED'][i]

        dim_embed = spec['DIM_EMBED'][-1]
        self.norm = norm_layer(dim_embed)
        self.pooler = nn.AdaptiveAvgPool1d(1)

        # Classifier head
        self.head = nn.Linear(dim_embed, num_classes) if num_classes > 0 else nn.Identity()
        trunc_normal_(self.head.weight, std=0.02)

    def forward_features(self, x):
        for stage in self.stages:
            x = stage(x)

        x = rearrange(x, 'b c h w -> b (h w) c') # (B, L, C)
        x = self.norm(x)                         # (B, L, C)
        x = self.pooler(x.transpose(1,2))        # (B, C, 1)
        x = torch.flatten(x, 1)                  # (B, C)
        # x = torch.mean(x, dim=1)

        return x

    def forward(self, x):
        x = self.forward_features(x)
        x = self.head(x)

        return x

In [None]:
class QuickGELU(nn.Module):
    def forward(self, x: torch.Tensor):
        return x * torch.sigmoid(1.702 * x)

In [None]:
spec = {
    'NUM_STAGES': 3,
    'PATCH_SIZE': [7,3,3],
    'PATCH_STRIDE': [4,2,2],
    'PATCH_PADDING': [2,1,1],
    'DIM_EMBED': [64,192,384],
    'DEPTH': [1,4,16],
    'NUM_HEADS': [4,8,12],   # original : [1,3,6]
    'MLP_RATIO': [4.,4.,4.],
    'QKV_BIAS': [True, True, True],
    'DROP_RATE': [0.,0.,0.],
    'ATTN_DROP_RATE': [0.,0.,0.],
    'DROP_PATH_RATE': [0.,0.,0.1],
    'KERNEL_QKV': [3,3,3],
    'PADDING_Q': [1,1,1],
    'PADDING_KV': [1,1,1],
    'STRIDE_Q': [1,1,1],
    'STRIDE_KV': [2,2,2],
}

spec

In [None]:
model = ConvolutionalVisionTransformer(act_layer=QuickGELU, spec=spec)

test_result = model(test_img)
test_img.shape, test_result.shape, test_result

In [None]:
from torchsummary import summary

summary(model.cuda(), (3, 224, 224))

In [None]:
# Transforms 정의하기
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8,1), interpolation=transforms.InterpolationMode.LANCZOS),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.9, scale=(0.02, 0.33)),
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

data_dir = '../data/sports'
batch_size = 256

train_path = data_dir+'/train'
valid_path = data_dir+'/valid'
test_path = data_dir+'/test'

# dataset load
train_data = ImageFolder(train_path, transform=train_transform)
valid_data = ImageFolder(valid_path, transform=test_transform)
test_data = ImageFolder(test_path, transform=test_transform)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

In [None]:
device = 'cuda:3'
max_norm = 1.0 

model.to(device)
model_path = '../models/cvt/model.pth'

In [None]:
mixup_fn = Mixup(mixup_alpha=.7, 
                cutmix_alpha=1., 
                prob=.7, 
                switch_prob=0.5, 
                mode='batch',
                label_smoothing=.1,
                num_classes=100)

epochs = 200

criterion = nn.CrossEntropyLoss(label_smoothing=0.)

In [None]:
optimizer = optim.AdamW(model.parameters())
warmup_steps = int(len(train_loader)*(epochs)*0.1)
train_steps = len(train_loader)*(epochs)
scheduler = transformers.get_cosine_schedule_with_warmup(optimizer, 
                                                        num_warmup_steps=warmup_steps, 
                                                        num_training_steps=train_steps,
                                                        num_cycles=0.5)

In [None]:
training_time = 0
losses = []
val_losses = []
lrs = []
best_loss = float('inf')

# GradScaler 초기화
scaler = GradScaler()

for epoch in range(epochs):
    model.train()
    start_time = time.time()
    running_loss = 0.0
    pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch + 1}")
    
    for _, data in pbar:
        inputs, labels = data[0].to(device), data[1].to(device)
        inputs, labels = mixup_fn(inputs, labels)
        optimizer.zero_grad()

        # AutoCast 적용
        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
        # 스케일링된 그라디언트 계산
        scaler.scale(loss).backward()

        # 그라디언트 클리핑 전에 스케일링 제거
        scaler.unscale_(optimizer)
        clip_grad_norm_(model.parameters(), max_norm=max_norm)

        # 옵티마이저 스텝 및 스케일러 업데이트
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()
            
        lr = optimizer.param_groups[0]["lr"]
        lrs.append(lr)
        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    losses.append(epoch_loss)        

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for data in valid_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            
    val_loss /= len(valid_loader)
    val_losses.append(val_loss)
    
    # 모델 저장
    if val_loss < best_loss:
        best_loss = val_loss
        vit_save = True
        if vit_save:
            torch.save(model.state_dict(), model_path)

    epoch_duration = time.time() - start_time
    training_time += epoch_duration
    
    text = f'\tLoss: {epoch_loss}, Val Loss: {val_loss}, LR: {lr}, Duration: {epoch_duration:.2f} sec'
    
    if vit_save:
        text += f' - model saved!'
        vit_save = False

    print(text)
        
text = f"Epoch 당 평균 소요시간 : {training_time / epochs:.2f}초"      
print(text)

In [None]:
# 예측 수행 및 레이블 저장
all_preds = []
all_labels = []
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 혼동 행렬 생성
cm = confusion_matrix(all_labels, all_preds)

# 예측과 실제 레이블
y_true = all_labels  # 실제 레이블
y_pred = all_preds  # 모델에 의해 예측된 레이블

# 전체 데이터셋에 대한 정확도
accuracy = accuracy_score(y_true, y_pred)

# 평균 정밀도, 리콜, F1-Score ('weighted')
precision, recall, f1_score, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')

# 판다스 데이터프레임으로 결과 정리
performance_metrics = pd.DataFrame({
    'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score'],
    'Value': [accuracy, precision, recall, f1_score]
})

# 데이터프레임 출력
performance_metrics