In [None]:
import torch
import torch.nn as nn
import torchvision

import sys
import math

from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim

from torchvision import transforms
from sklearn.metrics import accuracy_score

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from tqdm.notebook import tqdm
import time
import pandas as pd
import numpy as np
import random
import math
import os
import cv2
from PIL import Image

from sklearn.model_selection import train_test_split

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
class cfg: 
    valid_size = 0.2
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    seed = 1212
    pad_idx = 0
    n_head = 8
    ffn_hidden = 2048
    n_layers = 4
    max_width = 310
    max_len = 8
    drop_prob = 0.1
    epochs = 20
    learning_rate = 0.00001 # 0.0001
    lr_min = 0.0000001
    batch_size = 64
    num_workers = 4 # 본인의 GPU, CPU 환경에 맞게 설정
    print_step = 50
    pt_path = 'model_20epoch.pt'

In [None]:
# CNN Encoding 코드
# https://github.com/ayumiymk/aster.pytorch/blob/be670046c775b54de79766208f0c59321ae1eccf/lib/models/resnet_aster.py#L37

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                   padding=1, bias=False)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)



class AsterBlock(nn.Module):

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(AsterBlock, self).__init__()
        self.conv1 = conv1x1(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

class ResNet_ASTER(nn.Module):
  """For aster or crnn"""

  def __init__(self, with_lstm=False, n_group=1):
    super(ResNet_ASTER, self).__init__()
    self.with_lstm = with_lstm
    self.n_group = n_group

    in_channels = 3
    self.layer0 = nn.Sequential(
        nn.Conv2d(in_channels, 32, kernel_size=(3, 3), stride=1, padding=1, bias=False),
        nn.BatchNorm2d(32),
        nn.ReLU(inplace=True))

    self.inplanes = 32
    self.layer1 = self._make_layer(32,  3, [2, 2]) # [16, 50]
    self.layer2 = self._make_layer(64,  4, [2, 2]) # [8, 25]
    self.layer3 = self._make_layer(128, 6, [2, 1]) # [4, 25]
    self.layer4 = self._make_layer(256, 6, [2, 1]) # [2, 25]
    self.layer5 = self._make_layer(512, 3, [2, 1]) # [1, 25]
    self.layer6 = self._make_layer(512, 3, [2, 1])
    
    if with_lstm:
      self.rnn = nn.LSTM(512, 256, bidirectional=True, num_layers=2, batch_first=True)
      self.out_planes = 2 * 256
    else:
      self.out_planes = 512

#     for m in self.modules():
#       if isinstance(m, nn.Conv2d):
#         nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
#       elif isinstance(m, nn.BatchNorm2d):
#         nn.init.constant_(m.weight, 1)
#         nn.init.constant_(m.bias, 0)

  def _make_layer(self, planes, blocks, stride):
    downsample = None
    if stride != [1, 1] or self.inplanes != planes:
      downsample = nn.Sequential(
          conv1x1(self.inplanes, planes, stride),
          nn.BatchNorm2d(planes))

    layers = []
    layers.append(AsterBlock(self.inplanes, planes, stride, downsample))
    self.inplanes = planes
    for _ in range(1, blocks):
      layers.append(AsterBlock(self.inplanes, planes))
    return nn.Sequential(*layers)

  def forward(self, x):
    x0 = self.layer0(x)
    x1 = self.layer1(x0)
    x2 = self.layer2(x1)
    x3 = self.layer3(x2)
    x4 = self.layer4(x3)
    x5 = self.layer5(x4)
    x6 = self.layer6(x5)
    
    cnn_feat = x6.squeeze(2) # [N, c, w]
    cnn_feat = cnn_feat.transpose(2, 1)
    if self.with_lstm:
      rnn_feat, _ = self.rnn(cnn_feat)
      return rnn_feat
    else:
      return cnn_feat


In [None]:
x = torch.randn(3, 3, 32, 100) # batch,channel,h,w
net = ResNet_ASTER(with_lstm=True)
encoder_feat = net(x)
print(encoder_feat.size()) # 3,w//4,channel

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len, device): # ex. d_model = 512, max_len = 100
        # d_model : embedding dimension
        # max_len : 전체 데이터 문장에 대한 최대길이
        
        super(PositionalEncoding, self).__init__()

        self.encoding = torch.zeros(max_len, d_model, device=device) # self.encoding -> (max_len , d_model)
        self.encoding.requires_grad = False  # we don't need to compute gradient (학습할 필요가 없는 값)

        pos = torch.arange(0, max_len, device=device) # pos -> (max_len) # ex. pos = [0,1,2,3,...,99]
        pos = pos.float().unsqueeze(dim=1) # pos -> (max_len, 1) 

        _2i = torch.arange(0, d_model, step=2, device=device).float() # _2i -> (d_model//2) # ex. _2i = [0,2,4,...,510]

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
        # self.encoding[i,j] -> j가 짝수 : torch.sin(i/(10000)**(j/512))
        #                    -> j가 홀수 : torch.cos(i/(10000)**((j-1)/512))

        # self.encoding 은 i번째 단어에 대해 i번째 단어라는것을 구분짓기 위한 encoding 값을 제공함
        
    def forward(self, x): # x -> (Batch,Length)
        if len(x.size())>=3:
            b, seq_len , d_e = x.size() # seq_len != max_len (seq_len : 이번 배치에서의 seq_len)
        else:
            b, seq_len = x.size()

        # seq_len이 배치내의 문장 최대 길이이므로 seq_len까지 단어 순서를 구분해주기 위한 encoding 값을 가져감
        return self.encoding[:seq_len, :] 

In [None]:
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len, drop_prob,device): # max_len은 전체 데이터에 대한 max_len
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = PositionalEncoding(d_model, max_len,device)
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x): # x -> (Batch,Length) (Length : Batch내의 최대 문장 길이)
        tok_emb = self.tok_emb(x) # tok_emb -> (Batch,Length,d_model)
        pos_emb = self.pos_emb(x) # pos_emb -> (Length,d_model)
        return self.drop_out(tok_emb + pos_emb) # (Batch,Length,d_model)  # pos_emb가 broadcasting 됨

In [None]:
class TransformerEncoderEmbedding(nn.Module):
    def __init__(self, max_width, drop_prob,device): # max_len은 전체 데이터에 대한 max_len
        super(TransformerEncoderEmbedding, self).__init__()
        self.pos_emb = PositionalEncoding(512, max_width//4+1,device) # 가장긴 이미지폭의 4분의1이기때문에
        self.drop_out = nn.Dropout(p=drop_prob)

    def forward(self, x): # x -> (Batch,Length,Embedding) (Length : Batch내의 가장긴 이미지 폭//4)
        pos_emb = self.pos_emb(x) # pos_emb -> (Length,Embedding)
        return self.drop_out(x + pos_emb) # (Batch,Length,Embedding)  # pos_emb가 broadcasting 됨

In [None]:
class ScaleDotProductAttention(nn.Module):
    def __init__(self):
        super(ScaleDotProductAttention, self).__init__()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, mask=None, inf_value=1e12): 
        # q,k,v -> [batch_size, head, length, d_tensor]  (d_tensor = d_model // n_head)
        # mask -> [batch_size , 1 , len_query , len_key]

        batch_size, head, length, d_tensor = k.size()

        k_t = k.transpose(2, 3)  # transpose # (B,n_head,d,L_k)
        score = (q @ k_t) / math.sqrt(d_tensor) # (B,n_head,L_q,L_k)

        if mask is not None: # (B, 1, L, L)
            score = score.masked_fill(mask == False, (-1)*inf_value) 
            # softmax 적용시 e^(-inf) = 0이 되므로 0대신 -inf를 넣어줌 0을 넣으면 e^0 = 1 로 1이 나오게됨

        score = self.softmax(score) # (B,n_head,L_q,L_k)
 
        v = score @ v # (B,n_head,L_q,d_tensor)
        # @ = matmul , mul은 원소별 곱셈 

        return v, score  # v(attention output) -> (B,n_head,L_q,d_tensor) , score -> (B,n_head,L,L)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.n_head = n_head
        self.attention = ScaleDotProductAttention()
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_concat = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None): 
        # q,k,v -> [Batch,Length,d_model] , mask -> [batch_size , 1 , len_query , len_key]
        
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v) # [batch_size, length, d_model]
        # 합쳐서 연산하나 나눠서 연산하나 Linear 적용하는 부분은 똑같기때문에 한번에 연산함.
        
        q, k, v = self.split(q), self.split(k), self.split(v) 
        # [batch_size, head, length, d_tensor]  (d_tensor = d_model // self.n_head)

        out, attention = self.attention(q, k, v, mask=mask) 
        # v(attention output) -> (B,n_head,L_q,d_tensor) , score -> (B,n_head,L,L)
        # v-> out , score -> attention

        out = self.concat(out) # (batch_size, length, d_model)
        out = self.w_concat(out) # (batch_size, length, d_model)

        return out # (batch_size, length, d_model)

    def split(self, tensor): # tensor -> [batch_size, length, d_model]

        batch_size, length, d_model = tensor.size()

        d_tensor = d_model // self.n_head
        tensor = tensor.view(batch_size, length, self.n_head, d_tensor).transpose(1, 2)

        return tensor # tensor -> [batch_size, head, length, d_tensor]

    def concat(self, tensor): # tensor-> [batch_size, head, length, d_tensor]

        batch_size, head, length, d_tensor = tensor.size()
        d_model = head * d_tensor

        tensor = tensor.transpose(1, 2).contiguous().view(batch_size, length, d_model)
        return tensor # tensor -> [batch_size, length, d_model]

In [None]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [None]:
class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, src_mask):
        # 1. compute self attention
        _x = x
        x = self.attention(q=x, k=x, v=x, mask=src_mask)
        
        # 2. add and norm
        x = self.dropout1(x)
        x = self.norm1(x + _x)
        
        # 3. positionwise feed forward network
        _x = x
        x = self.ffn(x)
      
        # 4. add and norm
        x = self.dropout2(x)
        x = self.norm2(x + _x)
        return x

In [None]:
class Encoder(nn.Module):

    def __init__(self, max_width, ffn_hidden, n_head, n_layers, drop_prob,device):
        super().__init__()
        self.emb = TransformerEncoderEmbedding(max_width=max_width,
                                        drop_prob=drop_prob,
                                        device=device
                                        )
        self.layers = nn.ModuleList([EncoderLayer(d_model=512,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

    def forward(self, x, src_mask):
        x = self.emb(x)

        for layer in self.layers:
            x = layer(x, src_mask)

        return x

In [None]:
class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.enc_dec_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, dec, enc, trg_mask, src_mask):    
        # 1. compute self attention
        _x = dec
        x = self.self_attention(q=dec, k=dec, v=dec, mask=trg_mask)
        
        # 2. add and norm
        x = self.dropout1(x)
        x = self.norm1(x + _x)

        if enc is not None:
            # 3. compute encoder - decoder attention
            _x = x
            x = self.enc_dec_attention(q=x, k=enc, v=enc, mask=src_mask)
            
            # 4. add and norm
            x = self.dropout2(x)
            x = self.norm2(x + _x)

        # 5. positionwise feed forward network
        _x = x
        x = self.ffn(x)
        
        # 6. add and norm
        x = self.dropout3(x)
        x = self.norm3(x + _x)
        return x

In [None]:
class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        drop_prob=drop_prob,
                                        max_len=max_len,
                                        vocab_size=dec_voc_size,
                                        device=device
                                        )

        self.layers = nn.ModuleList([DecoderLayer(d_model=d_model,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

        self.linear = nn.Linear(d_model, dec_voc_size)

    def forward(self, trg, src, trg_mask, src_mask):
        trg = self.emb(trg)

        for layer in self.layers:
            trg = layer(trg, src, trg_mask, src_mask)

        # pass to LM head
        output = self.linear(trg)
        return output

In [None]:
class CNN_Transformer(nn.Module):

    def __init__(self, pad_idx, dec_voc_size, n_head, max_width, max_len,
                ffn_hidden, n_layers, drop_prob,device):
        super().__init__()
        self.pad_idx = pad_idx # 길이 맞춰주기 위한 패딩 보통 1 사용
        self.device = device
        self.cnn_encoder = ResNet_ASTER(with_lstm=True)
        self.encoder = Encoder(n_head=n_head,
                            max_width=max_width,
                            ffn_hidden=ffn_hidden,
                            drop_prob=drop_prob,
                            n_layers=n_layers,
                            device=device
                            )
        
        self.decoder = Decoder(d_model=512,
                            n_head=n_head,
                            max_len=max_len,
                            ffn_hidden=ffn_hidden,
                            dec_voc_size=dec_voc_size,
                            drop_prob=drop_prob,
                            n_layers=n_layers,
                            device=device
                            )

    def forward(self, src, trg, src_mask_base): # src->[Batch,channel,h,w],trg -> [Batch,Length] # src_mask->[Batch,w//4]
        
        src = self.cnn_encoder(src) # src -> [Batch,w//4,512]
        
        src_mask = self.make_pad_mask(src_mask_base, src_mask_base) # [batch_size , 1 , len_src , len_src] 
        
        src_trg_mask = self.make_pad_mask(trg, src_mask_base) # [batch_size , 1 , len_trg , len_src] 
        
        trg_mask = self.make_pad_mask(trg, trg) * self.make_no_peak_mask(trg, trg) # [batch_size , 1 , len_trg , len_trg]
        # make_pad_mask(trg, trg) -> [batch_size , 1 , len_trg , len_trg]
        # make_no_peak_mask(trg, trg) -> [len_trg , len_trg]  (broadcasting 적용)
        enc_src = self.encoder(src, src_mask) # enc_src -> (batch_size, length, d_model)
        output = self.decoder(trg, enc_src, trg_mask, src_trg_mask)
        return output

    
    def make_pad_mask(self, q, k): # q,k -> [Batch,Length]
        len_q, len_k = q.size(1), k.size(1)

        # batch_size x 1 x 1 x len_k (unsqueeze는 강제로 그 차원에 1차원을 넣어줌)
        k = k.ne(self.pad_idx).unsqueeze(1).unsqueeze(2)
        # batch_size x 1 x len_q x len_k
        k = k.repeat(1, 1, len_q, 1) # bx1 , 1x1 , 1xlen_1 , len_k x 1 차원이 되는것임 (repeat)

        # batch_size x 1 x len_q x 1
        q = q.ne(self.pad_idx).unsqueeze(1).unsqueeze(3)
        # batch_size x 1 x len_q x len_k
        q = q.repeat(1, 1, 1, len_k)

        mask = k & q # 둘다 True일경우 True 반환 나머지는 모두 False
        return mask

    def make_no_peak_mask(self, q, k): # q,k -> [Batch,Length]
        len_q, len_k = q.size(1), k.size(1)

        # tril 은 대각선 윗부분을 0으로 만들어주는것
        # len_q x len_k
        mask = torch.tril(torch.ones(len_q, len_k)).type(torch.BoolTensor).to(self.device)

        return mask

In [None]:
df = pd.read_csv('./train.csv')
df['label_split']=df['label'].apply(lambda x : list(x))
df

In [None]:
# 제공된 학습데이터 중 1글자 샘플들의 단어사전이 학습/테스트 데이터의 모든 글자를 담고 있으므로 학습 데이터로 우선 배치
df['len'] = df['label'].str.len()
train_v1 = df[df['len']==1]

In [None]:
cfg.max_len = max(df['len'])+2 # 앞뒤 패드
print(cfg.max_len)

In [None]:
# 제공된 학습데이터 중 2글자 이상의 샘플들에 대해서 단어길이를 고려하여 Train (80%) / Validation (20%) 분할
df = df[df['len']>1]
train_v2, val, _, _ = train_test_split(df, df['len'], test_size=cfg.valid_size, random_state=cfg.seed)

In [None]:
# 학습 데이터로 우선 배치한 1글자 샘플들과 분할된 2글자 이상의 학습 샘플을 concat하여 최종 학습 데이터로 사용
train = pd.concat([train_v1, train_v2])
print(len(train), len(val))

In [None]:
# 학습 데이터로부터 단어 사전(Vocabulary) 구축
train_gt = [gt for gt in train['label']]
train_gt = "".join(train_gt)
letters = sorted(list(set(list(train_gt))))
print(len(letters))

In [None]:
print(letters[:10])

In [None]:
vocabulary = ['pad']+["start"]+['end'] + letters
print(len(vocabulary))
idx2char = {k:v for k,v in enumerate(vocabulary, start=0)}
char2idx = {v:k for k,v in idx2char.items()}

cfg.dec_voc_size = len(vocabulary)

In [None]:
print(idx2char[0])
print(idx2char[1])
print(idx2char[2])
print(idx2char[3])

In [None]:
train['label_data']=train['label_split'].apply(lambda x :[1]+[char2idx[item] for item in x]+[2])
val['label_data']=val['label_split'].apply(lambda x :[1]+[char2idx[item] for item in x]+[2])

In [None]:
train

In [None]:
val

In [None]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.train_mode = train_mode
        
    def __len__(self):
        return len(self.img_path_list)
    
    def __getitem__(self, index):
        image = Image.open(self.img_path_list[index]).convert('RGB')
        
        if self.train_mode:
            image = self.train_transform(image)
        else:
            image = self.test_transform(image)
            
        if self.label_list is not None:
            text = self.label_list[index]
            return image, torch.LongTensor(text)
        else:
            return image
    
    # Image Augmentation
    def train_transform(self, image):
        transform_ops = A.Compose([
#             A.Resize(cfg.height,cfg.width,p=1.0),
            A.RandomBrightnessContrast(brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2), p=0.5),
            A.Rotate(limit=(-15,15), p=0.5, border_mode=cv2.BORDER_REPLICATE),
            A.Normalize(mean=(0.8624, 0.8624, 0.8624), std=(0.19037, 0.19037, 0.19037),max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0)
        ], p=1.0)
        return transform_ops(image=np.array(image))['image']
    
    def test_transform(self, image):
        transform_ops = A.Compose([
#             A.Resize(cfg.height,cfg.width,p=1.0),
            A.Normalize(mean=(0.8624, 0.8624, 0.8624), std=(0.19037, 0.19037, 0.19037),max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0)
        ], p=1.0)
        return transform_ops(image=np.array(image))['image']

In [None]:
def collate_fn(batch):
    img_batch, tgt_batch = [], []
    pad_mask_batch =[]
    max_w = 0
    for img, tgt_sample in batch:
        w = img.size(2)
        if w>max_w:
            max_w=w        
        tgt_batch.append(tgt_sample)
    if max_w%4!=0:
        max_w+=(4-max_w%4)
    for img, _ in batch:
        w= img.size(2)
        new_img = torch.nn.functional.pad(img, (0,max_w-w,0,0), mode='replicate', value=0)
        pad_mask_batch.append(max_w-w)
        img_batch.append(new_img)

    tgt_batch = pad_sequence(tgt_batch,batch_first=True, padding_value=0)
    return torch.stack(img_batch), tgt_batch , pad_mask_batch

In [None]:
def collate_fn_test(batch):
    img_batch = []
    pad_mask_batch =[]
    max_w = 0
    for img in batch:
        w = img.size(2)
        if w>max_w:
            max_w=w        
    if max_w%4!=0:
        max_w+=(4-max_w%4)
        
    for img in batch:
        w= img.size(2)
        new_img = torch.nn.functional.pad(img, (0,max_w-w,0,0), mode='replicate', value=0)
        pad_mask_batch.append(max_w-w)
        img_batch.append(new_img)

    return torch.stack(img_batch), pad_mask_batch

In [None]:
train_dataset = CustomDataset(train['img_path'].values, train['label_data'].values, True)
train_loader = DataLoader(train_dataset, batch_size = cfg.batch_size, shuffle=True, num_workers=cfg.num_workers,collate_fn=collate_fn)

val_dataset = CustomDataset(val['img_path'].values, val['label_data'].values, False)
val_loader = DataLoader(val_dataset, batch_size = cfg.batch_size, shuffle=True, num_workers=cfg.num_workers,collate_fn=collate_fn)

val_test_dataset = CustomDataset(val['img_path'].values, None, False)
val_test_loader = DataLoader(val_test_dataset, batch_size = cfg.batch_size, shuffle=False, num_workers=cfg.num_workers,collate_fn=collate_fn_test)

In [None]:
image_batch, text_batch ,pad_mask_batch= next(iter(train_loader))
print(image_batch, text_batch,pad_mask_batch)
print(image_batch.size()),print(text_batch.size()),print(len(pad_mask_batch))

In [None]:
model = CNN_Transformer(
    cfg.pad_idx,
    cfg.dec_voc_size,
    cfg.n_head,
    cfg.max_width,
    cfg.max_len,
    cfg.ffn_hidden,
    cfg.n_layers,
    cfg.drop_prob,
    cfg.device
).to(cfg.device)

# Adam optimizer로 학습 최적화
optimizer = torch.optim.Adam(model.parameters(), lr=cfg.learning_rate)

# 뒷 부분의 패딩(padding)에 대해서는 값 무시
criterion = nn.CrossEntropyLoss(ignore_index = cfg.pad_idx)
            
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer = optimizer, T_0 = cfg.epochs, eta_min = cfg.lr_min)

In [None]:
if cfg.pt_path is not None:
    model.load_state_dict(torch.load(cfg.pt_path))

In [None]:
a= [2,4,1,5,7,10]
max_len = 12
batch_img_mask=[]
for pad_len in a:
    img_mask = [1 for _ in range(max_len//4-pad_len//4)]+[0 for _ in range(pad_len//4)]
    batch_img_mask.append(img_mask)
print(batch_img_mask)

In [None]:
def train(model, iterator, optimizer, criterion, clip,epoch, device):
    model.train()
    epoch_loss = 0
    iters = len(iterator)
    for i, batch in enumerate(tqdm(iterator)):
        image = batch[0].to(device) # Batch,channel,height,width
        text = batch[1].to(device) # Batch,Lenght
        pad_mask = batch[2]
        batch_w = image.size(-1)
        batch_img_mask=[]
        for pad_len in pad_mask:
            img_mask = [1 for _ in range(batch_w//4-pad_len//4)]+[0 for _ in range(pad_len//4)]
            batch_img_mask.append(img_mask)
        batch_img_mask=torch.LongTensor(batch_img_mask).to(device)
        
        optimizer.zero_grad()
        
        # 출력 단어의 마지막 인덱스는 제외 
        output = model(image, text[:, :-1],batch_img_mask) # output -> [Batch, trg_len - 1, output_dim]
        output_reshape = output.contiguous().view(-1, output.shape[-1]) # output_reshape -> [Batch*(trg_len-1), output_dim]
        text = text[:, 1:].contiguous().view(-1) # trg -> [Batch*(trg_len-1)]

        loss = criterion(output_reshape, text)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        
        
        scheduler.step(epoch+i/iters)
        
        epoch_loss += loss.item()
        if i%cfg.print_step==0:
            now_lr = get_lr(optimizer)
            print(f'step : {i/cfg.print_step+1}, loss : {epoch_loss/(i+1)}, now_lr : {now_lr}')
    return epoch_loss / len(iterator)

In [None]:
def evaluate(model, iterator, criterion,device):
    model.eval() # 평가 모드
    epoch_loss = 0

    with torch.no_grad():
        for i, batch in enumerate(tqdm(iterator)):
            image = batch[0].to(device) # Batch,channel,height,width
            text = batch[1].to(device) # Batch,Length
            
            pad_mask = batch[2]
            batch_w = image.size(-1)
            batch_img_mask=[]
            for pad_len in pad_mask:
                img_mask = [1 for _ in range(batch_w//4-pad_len//4)]+[0 for _ in range(pad_len//4)]
                batch_img_mask.append(img_mask)
            batch_img_mask=torch.LongTensor(batch_img_mask).to(device)
            output = model(image, text[:,:-1],batch_img_mask) # output -> [Batch, trg_len - 1, output_dim]

            output_reshape = output.contiguous().view(-1, output.shape[-1]) # output_reshape -> [Batch*(trg_len-1), output_dim]
            
            # start token 제외
            text = text[:,1:].contiguous().view(-1) # text -> [Batch*(trg_len-1)]

            # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
            loss = criterion(output_reshape, text)

            # 전체 손실 값 계산
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [None]:
def test(model, iterator,device):
    model.eval() # 평가 모드

    with torch.no_grad():
        answer_list=[]
        for i, batch in enumerate(tqdm(iterator)):
            image = batch[0].to(device) # Batch,channel,height,width
            
            src = model.cnn_encoder(image) # src -> [Batch,w//4,512]
            
            pad_mask = batch[1]
            batch_w = image.size(-1)
            batch_img_mask=[]
            for pad_len in pad_mask:
                img_mask = [1 for _ in range(batch_w//4-pad_len//4)]+[0 for _ in range(pad_len//4)]
                batch_img_mask.append(img_mask)
            batch_img_mask=torch.LongTensor(batch_img_mask).to(device)
            
            src_mask = model.make_pad_mask(batch_img_mask, batch_img_mask) 
            
            enc_src = model.encoder(src,src_mask)
            
            trg_tensor = torch.ones(image.size()[0], 1).type(torch.LongTensor).to(device) # Batch,1
            score_dict = [trg_tensor for _ in range(cfg.beem_search_k)]
            for i in range(cfg.max_len): # 출력하고 싶은 문장의 최대길이
                
                src_trg_mask = model.make_pad_mask(trg_tensor, batch_img_mask) # [batch_size , 1 , len_trg , len_src] 
                trg_mask = model.make_pad_mask(trg_tensor, trg_tensor) * model.make_no_peak_mask(trg_tensor, trg_tensor) 
                
                output = model.decoder(trg_tensor, enc_src, trg_mask, src_trg_mask)
                
                # 출력 문장에서 가장 마지막 단어만 사용 # trg가 한단어라면 한단어가 결과로 나옴. 4단어라면 4단어 결과가나옴.
                pred_token = output.argmax(-1)[:,-1].unsqueeze(1) # output -> [Batch, trg_len - 1, output_dim] -> [Batch, trg_len - 1]
                trg_tensor = torch.cat([trg_tensor,pred_token],dim=-1)
                
            
            trg_tensor = trg_tensor.tolist() # [Batch,max_len]
            # 각 출력 단어 인덱스를 실제 단어로 변환
            word_list=[]
            for word in trg_tensor:
                spel_list=[]
                for idx in word[1:]:
                    if idx ==0 or idx == 1 or idx ==2:
                        break
                    char = idx2char[idx]
                    spel_list.append(char)
                word_list.append("".join(spel_list))
            
            answer_list+=word_list
    return answer_list

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
cfg.clip = 5
best_accuracy = 0
best_valid_loss = float('inf')

for epoch in range(cfg.epochs):
    start_time = time.time() # 시작 시간 기록

    train_loss = train(model, train_loader, optimizer, criterion, cfg.clip, epoch, cfg.device)
    valid_loss = evaluate(model, val_loader, criterion, cfg.device)

    answer_list = test(model, val_test_loader, cfg.device)
    accuracy = accuracy_score(val['label'],answer_list)

    if best_accuracy < accuracy:
        best_accuracy = accuracy
        torch.save(model.state_dict(), 'model_best_accuracy.pt')
    
    end_time = time.time() # 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'model_best_loss.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f}')
    print(f'\tAccuracy : {accuracy:.5f}')

In [None]:
model.load_state_dict(torch.load('model_best_loss.pt'))

In [None]:
test_df= pd.read_csv('./test.csv')

test_dataset = CustomDataset(test_df['img_path'].values, None, False)
test_loader = DataLoader(test_dataset, batch_size = cfg.batch_size, shuffle=False, num_workers=cfg.num_workers,collate_fn=collate_fn_test)


In [None]:
answer_list = test(model, test_loader, cfg.device)

In [None]:
print(answer_list)

In [None]:
submit = pd.read_csv('./sample_submission.csv')
submit['label'] = answer_list

In [None]:
submit

In [None]:
submit.to_csv('./cnn_transformer_config1.csv', index=False)