In [109]:
import random
import pandas as pd
import numpy as np
import os
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision.models import resnet18
from torchvision import transforms

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings(action='ignore') 
import cv2
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2


In [110]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

cuda


In [111]:
CFG = {
    'IMG_HEIGHT_SIZE':64,
    'IMG_WIDTH_SIZE':224,
    'EPOCHS':80,
    'LEARNING_RATE':1e-3,
    'BATCH_SIZE':64,
    'NUM_WORKERS':4, # 본인의 GPU, CPU 환경에 맞게 설정
    'SEED':41
}

In [112]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

In [113]:
os.getcwd()

'/home/gsc/dacon_KYOWON/Dacon_KYOWON'

In [114]:
#window
# base_dir = 'D:/Dacon_KYOWON/Dacon_KYOWON'
# data_dir = "D:/Dacon_KYOWON/open"
#ubuntu
base_dir = '/home/gsc/dacon_KYOWON/Dacon_KYOWON'
data_dir = '/home/gsc/dacon_KYOWON/open'


In [115]:
df = pd.read_csv(f'{data_dir}/train.csv')

In [116]:
# 제공된 학습데이터 중 1글자 샘플들의 단어사전이 학습/테스트 데이터의 모든 글자를 담고 있으므로 학습 데이터로 우선 배치
df['len'] = df['label'].str.len()
train_v1 = df[df['len']==1]

In [117]:
# 제공된 학습데이터 중 2글자 이상의 샘플들에 대해서 단어길이를 고려하여 Train (80%) / Validation (20%) 분할
df = df[df['len']>1]
train_v2, val, _, _ = train_test_split(df, df['len'], test_size=0.2, random_state=CFG['SEED'])

데이터 길이를 1,2,3,4로 나눠보는 것은 어떨까

In [118]:
# 학습 데이터로 우선 배치한 1글자 샘플들과 분할된 2글자 이상의 학습 샘플을 concat하여 최종 학습 데이터로 사용
train = pd.concat([train_v1, train_v2])
print(len(train), len(val))

66251 10637


In [119]:
# 학습 데이터로부터 단어 사전(Vocabulary) 구축
train_gt = [gt for gt in train['label']]
train_gt = "".join(train_gt)
letters = sorted(list(set(list(train_gt))))
print(len(letters))

2349


In [120]:
vocabulary = ["-"] + letters
print(len(vocabulary))
idx2char = {k:v for k,v in enumerate(vocabulary, start=0)}
char2idx = {v:k for k,v in idx2char.items()}

2350


In [121]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list,transforms=None, train_mode=True):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms
        self.train_mode = train_mode
        
    def __len__(self):
        return len(self.img_path_list)
    
    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        image = cv2.imread(f"{data_dir}/{img_path.split('/')[-2]}/{img_path.split('/')[-1]}")
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transforms is not None:
            image = self.transforms(image=image)['image']
           
        if self.label_list is not None:
            text = self.label_list[index]
            return image, text
        else:
            return image
  

In [123]:
def resize_transform(height,width, state='train'):
    if state == 'train':
        transform = A.Compose([
                                #A.HorizontalFlip(p=0.2),
                                #A.VerticalFlip(p=0.2),
                                A.Rotate(limit=[-45,45], p=1),
                                #A.RandomRotate90(p=0.2),
                                A.Resize(height,width),
                                #A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
                                #A.RandomResizedCrop(height=height, width=width, scale=(0.3, 1.0)),
                                A.ToGray(),
                                A.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
                                ToTensorV2(),
                                ])
    else:
        transform = A.Compose([
                            A.Resize(height,width),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            #A.RandomResizedCrop(height=CFG['IMG_SIZE'], width=CFG['IMG_SIZE'], scale=(0.3, 1.0)),               
                            ToTensorV2()
                            ])

    return transform

In [124]:
train_transform = resize_transform(CFG['IMG_HEIGHT_SIZE'],CFG['IMG_WIDTH_SIZE'])
test_transform = resize_transform(CFG['IMG_HEIGHT_SIZE'],CFG['IMG_WIDTH_SIZE'],'test')

In [125]:
img_path = train['label'].values
print(img_path)
#print(f"{data_dir}/{img_path.split('/')[-2]}/{img_path.split('/')[-1]}")

['머' '써' '빈' ... '계속' '단계' '손수']


In [126]:
train_dataset = CustomDataset(train['img_path'].values, train['label'].values, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=CFG['NUM_WORKERS'])

val_dataset = CustomDataset(val['img_path'].values, val['label'].values,test_transform)
val_loader = DataLoader(val_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=CFG['NUM_WORKERS'])

In [127]:
image_batch, text_batch = iter(train_loader).next()
print(image_batch.size(), text_batch)

torch.Size([64, 3, 64, 224]) ('신비', '그나마', '씻기다', '쉴', '일반', '셋', '수입되다', '세', '달리다', '뜁', '선원', '추가', '대량', '예매하다', '콩', '쯩', '못', '사모님', '뱀', '호주머니', '젤', '벌', '불', '베개', '수도권', '회복되다', '날씨', '유산', '견해', '체조', '얘', '튀김', '삼계탕', '어려움', '세', '발자국', '터', '함께', '스타', '샛', '참여하다', '븐', '학생증', '신인', '신청', '양복', '추진하다', '집중하다', '구분되다', '기타', '지구', '꽐', '명예', '대륙', '천장', '신', '칡', '창조', '걋', '나무', '향', '여행사', '강', '흄')


In [128]:
from torchsummary import summary as summary_

In [129]:
# resnet_test = resnet18(pretrained=True)
# resnet_test.cuda()
# summary_(resnet_test,(3,64,224))

In [130]:
import torchvision.models as models
eff_test = models.efficientnet_b5(pretrained=True)
eff_test.cuda()
summary_(eff_test,(3,64,224))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 48, 32, 112]           1,296
       BatchNorm2d-2          [-1, 48, 32, 112]              96
              SiLU-3          [-1, 48, 32, 112]               0
            Conv2d-4          [-1, 48, 32, 112]             432
       BatchNorm2d-5          [-1, 48, 32, 112]              96
              SiLU-6          [-1, 48, 32, 112]               0
 AdaptiveAvgPool2d-7             [-1, 48, 1, 1]               0
            Conv2d-8             [-1, 12, 1, 1]             588
              SiLU-9             [-1, 12, 1, 1]               0
           Conv2d-10             [-1, 48, 1, 1]             624
          Sigmoid-11             [-1, 48, 1, 1]               0
SqueezeExcitation-12          [-1, 48, 32, 112]               0
           Conv2d-13          [-1, 24, 32, 112]           1,152
      BatchNorm2d-14          [-1, 24, 

In [131]:
class RecognitionModel_eff(nn.Module):
    def __init__(self, num_chars=len(char2idx), rnn_hidden_size=256):
        super(RecognitionModel_eff, self).__init__()
        self.num_chars = num_chars
        self.rnn_hidden_size = rnn_hidden_size
        
        # CNN Backbone = 사전학습된 resnet18 활용
        # https://arxiv.org/abs/1512.03385
        effnet = models.efficientnet_b5(pretrained=True)
        # CNN Feature Extract
        effnet_modules = list(effnet.features)[:-3]
        self.feature_extract = nn.Sequential(
            *effnet_modules,
            nn.Conv2d(176, 256, kernel_size=(3,6), stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True)
        )

        self.linear1 = nn.Linear(1024, rnn_hidden_size)
        
        # RNN
        self.rnn = nn.RNN(input_size=rnn_hidden_size, 
                            hidden_size=rnn_hidden_size,
                            bidirectional=True, 
                            batch_first=True)
        self.linear2 = nn.Linear(self.rnn_hidden_size*2, num_chars)
        
        #LSTM
        self.lstm = nn.LSTM(input_size=rnn_hidden_size,
                           hidden_size= rnn_hidden_size,
                           bidirectional = True,
                           batch_first=True)
        
        #GRU
        self.gru = nn.GRU(input_size=rnn_hidden_size,
                           hidden_size= rnn_hidden_size,
                           bidirectional = True,
                           batch_first=True)
        
    def forward(self, x):
        # CNN
        x = self.feature_extract(x) # [batch_size, channels, height, width]
        x = x.permute(0, 3, 1, 2) # [batch_size, width, channels, height]
         
        batch_size = x.size(0)
        T = x.size(1)
        x = x.view(batch_size, T, -1) # [batch_size, T==width, num_features==channels*height]
        x = self.linear1(x)
        #print(x.shape)
        # RNN
        #x, hidden = self.rnn(x)
        
        #LSTM
        x, (hidden,_) = self.lstm(x)
        #GRU
        #x, (hidden,_) = self.gru(x)
        
        output = self.linear2(x)
        output = output.permute(1, 0, 2) # [T==10, batch_size, num_classes==num_features]
        
        return output

In [132]:
RecognitionModel_eff_test = RecognitionModel_eff()
RecognitionModel_eff_test.cuda()
#summary_(RecognitionModel_eff_test,(3,64,224))
#RecognitionModel_eff_test.state_dict().keys()

RecognitionModel_eff(
  (feature_extract): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): MBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(48, 48, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=48, bias=False)
            (1): BatchNorm2d(48, eps=0.001, momentum=0.01, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
          (1): SqueezeExcitation(
            (avgpool): AdaptiveAvgPool2d(output_size=1)
            (fc1): Conv2d(48, 12, kernel_size=(1, 1), stride=(1, 1))
            (fc2): Conv2d(12, 48, kernel_size=(1, 1), stride=(1, 1))
            (activation): SiLU(inplace=True)
            (scale_activation): Sigmoid()
          )
          (2)

In [133]:
# RecognitionModel_test = RecognitionModel()
# RecognitionModel_test.cuda()
# summary_(RecognitionModel_test,(3,64,224))

In [134]:
# class RecognitionModel(nn.Module):
#     def __init__(self, num_chars=len(char2idx), rnn_hidden_size=256):
#         super(RecognitionModel, self).__init__()
#         self.num_chars = num_chars
#         self.rnn_hidden_size = rnn_hidden_size
        
#         # CNN Backbone = 사전학습된 resnet18 활용
#         # https://arxiv.org/abs/1512.03385
#         resnet = resnet18(pretrained=True)
#         # CNN Feature Extract
#         resnet_modules = list(resnet.children())[:-3]
#         self.feature_extract = nn.Sequential(
#             *resnet_modules,
#             nn.Conv2d(256, 256, kernel_size=(3,6), stride=1, padding=1),
#             nn.BatchNorm2d(256),
#             nn.ReLU(inplace=True)
#         )

#         self.linear1 = nn.Linear(1024, rnn_hidden_size)
        
#         # RNN
#         self.rnn = nn.RNN(input_size=rnn_hidden_size, 
#                             hidden_size=rnn_hidden_size,
#                             bidirectional=True, 
#                             batch_first=True)
#         self.linear2 = nn.Linear(self.rnn_hidden_size*2, num_chars)
        
        
#     def forward(self, x):
#         # CNN
#         x = self.feature_extract(x) # [batch_size, channels, height, width]
#         x = x.permute(0, 3, 1, 2) # [batch_size, width, channels, height]
         
#         batch_size = x.size(0)
#         T = x.size(1)
#         x = x.view(batch_size, T, -1) # [batch_size, T==width, num_features==channels*height]
#         x = self.linear1(x)
        
#         # RNN
#         x, hidden = self.rnn(x)
        
#         output = self.linear2(x)
#         output = output.permute(1, 0, 2) # [T==10, batch_size, num_classes==num_features]
        
#         return output

In [135]:
criterion = nn.CTCLoss(blank=0) # idx 0 : '-'

In [136]:
def encode_text_batch(text_batch):
    text_batch_targets_lens = [len(text) for text in text_batch]
    text_batch_targets_lens = torch.IntTensor(text_batch_targets_lens)
    
    text_batch_concat = "".join(text_batch)
    text_batch_targets = [char2idx[c] for c in text_batch_concat]
    text_batch_targets = torch.IntTensor(text_batch_targets)
    
    return text_batch_targets, text_batch_targets_lens

In [137]:
def compute_loss(text_batch, text_batch_logits):
    """
    text_batch: list of strings of length equal to batch size
    text_batch_logits: Tensor of size([T, batch_size, num_classes])
    """
    text_batch_logps = F.log_softmax(text_batch_logits, 2) # [T, batch_size, num_classes]  
    text_batch_logps_lens = torch.full(size=(text_batch_logps.size(1),), 
                                       fill_value=text_batch_logps.size(0), 
                                       dtype=torch.int32).to(device) # [batch_size] 

    text_batch_targets, text_batch_targets_lens = encode_text_batch(text_batch)
    loss = criterion(text_batch_logps, text_batch_targets, text_batch_logps_lens, text_batch_targets_lens)

    return loss

In [138]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    
    best_loss = 999999
    best_model = None
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for image_batch, text_batch in tqdm(iter(train_loader)):
            image_batch = image_batch.to(device)
            
            optimizer.zero_grad()
            
            text_batch_logits = model(image_batch)
            loss = compute_loss(text_batch, text_batch_logits)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        _train_loss = np.mean(train_loss)
        
        _val_loss = validation(model, val_loader, device)
        print(f'Epoch : [{epoch}] Train CTC Loss : [{_train_loss:.5f}] Val CTC Loss : [{_val_loss:.5f}]')
        
        if scheduler is not None:
            scheduler.step(_val_loss)
        
        if best_loss > _val_loss:
            best_loss = _val_loss
            best_model = model
    
    return best_model

In [139]:
def validation(model, val_loader, device):
    model.eval()
    val_loss = []
    with torch.no_grad():
        for image_batch, text_batch in tqdm(iter(val_loader)):
            image_batch = image_batch.to(device)
            
            text_batch_logits = model(image_batch)
            loss = compute_loss(text_batch, text_batch_logits)
            
            val_loss.append(loss.item())
    
    _val_loss = np.mean(val_loss)
    return _val_loss

In [140]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
model = RecognitionModel_eff()
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/1036 [00:00<?, ?it/s]

In [None]:
torch.save(infer_model,'b5_80_lstm_gray.pt')

In [None]:
# model = RecognitionModel()
# model.eval()
# optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

# infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

In [None]:
test = pd.read_csv('../open/test.csv')

In [None]:
test_dataset = CustomDataset(test['img_path'].values, None,test_transform)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=CFG['NUM_WORKERS'])

In [None]:
def decode_predictions(text_batch_logits):
    text_batch_tokens = F.softmax(text_batch_logits, 2).argmax(2) # [T, batch_size]
    text_batch_tokens = text_batch_tokens.numpy().T # [batch_size, T]

    text_batch_tokens_new = []
    for text_tokens in text_batch_tokens:
        text = [idx2char[idx] for idx in text_tokens]
        text = "".join(text)
        text_batch_tokens_new.append(text)

    return text_batch_tokens_new

def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for image_batch in tqdm(iter(test_loader)):
            image_batch = image_batch.to(device)
            
            text_batch_logits = model(image_batch)
            
            text_batch_pred = decode_predictions(text_batch_logits.cpu())
            
            preds.extend(text_batch_pred)
    return preds

In [None]:
predictions = inference(infer_model, test_loader, device)

In [None]:
# 샘플 별 추론결과를 독립적으로 후처리
def remove_duplicates(text):
    if len(text) > 1:
        letters = [text[0]] + [letter for idx, letter in enumerate(text[1:], start=1) if text[idx] != text[idx-1]]
    elif len(text) == 1:
        letters = [text[0]]
    else:
        return ""
    return "".join(letters)

def correct_prediction(word):
    parts = word.split("-")
    parts = [remove_duplicates(part) for part in parts]
    corrected_word = "".join(parts)
    return corrected_word

In [None]:
submit = pd.read_csv('../open/sample_submission.csv')
submit['label'] = predictions
submit['label'] = submit['label'].apply(correct_prediction)

In [None]:
submit.to_csv('./submission_80_eff_lstm_gray.csv', index=False)

In [None]:
sub_text = pd.read_csv('./submission_80_eff_lstm_gray.csv')
sub_text

In [None]:
# cutmix사용 혹은 ATTENTION 구조 사용