### 1. 환경 세팅하기
-----

In [None]:
# 관련 모듈들을 다운로드 받고, 주요 폴더를 미리 생성시키기 위해서
!pip install pytorch-lightning torchmetrics plotly gradio opencv-python
!mkdir ./check_points ./lightning_logs

In [None]:
# 학습을 위해서 업로드된 압축파일을 풀기 위해서
!unzip ./Captcha_Train_Set_7000.zip
!rm ./Captcha_Train_Set_7000.zip

### 2. 학습 및 평가를 위한 데이터 로더 생성시키기
-----

In [None]:
import string
import glob
import numpy as np

from torch.utils.data.dataset import Dataset
from PIL import Image
from torchvision.transforms import Compose, ToTensor, Normalize

# 주어진 이미지 파일 경로의 이미지들을 이용해서 캡차 데이터셋을 생성시키기 위해서
class Captcha_Dataset(Dataset) :
  def __init__(self, image_dir_path) :
    self.CORPUS = string.ascii_lowercase + string.digits
    self.BOW = self.__make_BOW(self.CORPUS)
    self.IMAGE_FILE_PATHS = glob.glob(image_dir_path + "/*.jpg")
    self.COMPOSE = Compose([
      ToTensor(),
      Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])

  def __len__(self) :
    return len(self.IMAGE_FILE_PATHS)

  def __getitem__(self, i) :
    IMAGE = self.COMPOSE(Image.open(self.IMAGE_FILE_PATHS[i]).convert("L").convert("RGB"))
    DATA = np.array(IMAGE).astype(np.float32)
    LABEL = np.array(self.__get_seq(self.__get_Label_From_Image_Path(self.IMAGE_FILE_PATHS[i])))
    return DATA, LABEL

  # 주어진 이미지 경로로부터 라벨을 추출시킨 결과를 반환하기 위해서
  def __get_Label_From_Image_Path(self, image_path) :
    return image_path.split("/")[-1].split(".")[0]

  # 주어진 문자열들을 BOW를 이용해서 정수 리스트로 변환시키기 위해서
  def __get_seq(self, letters) :
    return list(map(lambda letter : self.BOW[letter], letters))

  # 주어진 문자열들에 대한 BOW를 생성시키고 반환하기 위해서
  def __make_BOW(self, corpus) :
    bow = {"<pad>":0}

    for letter in corpus :
      if letter not in bow :
        bow[letter] = len(bow)

    return bow

In [None]:
# 학습, 검증, 테스트 데이터셋 불러오고 관련 데이터 로더를 생성시키기 위해서
from torch.utils.data.dataloader import DataLoader

TRAIN_DATASET = Captcha_Dataset("./Captcha_Train_Set_7000/train_data")
VALID_DATASET = Captcha_Dataset("./Captcha_Train_Set_7000/valid_data")
TEST_DATASET = Captcha_Dataset("./Captcha_Train_Set_7000/test_data")

TRAIN_LOADER = DataLoader(TRAIN_DATASET, batch_size=8, shuffle=True, drop_last=True)
VALID_LOADER = DataLoader(VALID_DATASET, batch_size=8, shuffle=False, drop_last=True)
TEST_LOADER = DataLoader(TEST_DATASET, batch_size=8, shuffle=False, drop_last=True)

### 3. 신경망 모델 생성하기
-----

In [None]:
import pytorch_lightning as pl
import torchmetrics

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim.adam import Adam


# 이미지 크기를 줄이면서 ResNet 구조를 사용하기 위한 기본 모듈
class Basic_ResNet_Downsample_Layer(nn.Module) :
  def __init__(self, in_channels, out_channels) :
    super().__init__()
    
    self.CONV_1 = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 5), stride=(2, 1))
    self.CONV_2 = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=(3, 3), padding=1)

    self.BN_1 = nn.BatchNorm2d(num_features=out_channels)
    self.BN_2 = nn.BatchNorm2d(num_features=out_channels)

    self.LERU = nn.ReLU()

    self.CONV_DOWN_SAMPLE = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 5), stride=(2, 1))

    self.LAYER_SEQ = nn.Sequential(
      self.CONV_1, self.BN_1, self.LERU,
      self.CONV_2, self.BN_2
    )
  
  def forward(self, x) :
    x = self.LAYER_SEQ(x) + self.CONV_DOWN_SAMPLE(x)
    x = self.LERU(x)
    return x

# 이미지 크기를 유지하면서 ResNet 구조를 사용하기 위한 기본 모듈
class Basic_ResNet_Maintain_Layer(nn.Module) :
  def __init__(self, in_channels, out_channels) :
    super().__init__()
    
    self.CONV = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=(3, 3), padding=1)
    self.BN = nn.BatchNorm2d(num_features=out_channels)
    self.LERU = nn.ReLU()

    self.LAYER_SEQ = nn.Sequential(
      self.CONV, self.BN, self.LERU
    )
  
  def forward(self, x) :
    return self.LAYER_SEQ(x) + x


# 셀프 어텐션을 사용하는 Skip Layer 구조를 사용하기 위한 기본 모둘
class Basic_Self_Attension_Skip_Layer(nn.Module) :
  def __init__(self, input_size) :
    super().__init__()

    self.ATTENTION = torch.nn.MultiheadAttention(input_size, 8)
    self.BN = nn.BatchNorm1d(num_features=8)
  
  def forward(self, x) :
    x_ = x
    x, _ = self.ATTENTION(x, x, x)
    x = self.BN(x)
    return x + x_


# 기본 모듈을 겹겹히 조합하기 위한 주요 신경망 모듈
class CRNN_Module(pl.LightningModule) :
  def __init__(self, bow) :
    super().__init__()
    self.BOW = bow
    self.REV_BOW = list(self.BOW.keys())

    resnet_layers = []
    resnet_layers.append(Basic_ResNet_Downsample_Layer(in_channels=3, out_channels=32))
    for layer_index in range(1, 47+1) :
      if layer_index%12 == 0 : resnet_layers.append(Basic_ResNet_Downsample_Layer(in_channels=32, out_channels=32))
      else : resnet_layers.append(Basic_ResNet_Maintain_Layer(in_channels=32, out_channels=32))
    resnet_layers.append(nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(2, 5)))
    self.RESNET_LAYER_SEQ = nn.Sequential(*resnet_layers)


    attenion_layers = [Basic_Self_Attension_Skip_Layer(32) for _ in range(5)]
    self.SELF_ATTENSION_SKIP_LAYER_SEQ = nn.Sequential(*attenion_layers)


    self.FC_1 = nn.Linear(32, 64)
    self.FC_2 = nn.Linear(64, len(self.BOW))
    self.LERU = nn.ReLU()

    self.FC_LAYER_SEQ = nn.Sequential(
        self.FC_1, self.LERU, self.FC_2
    )
  
  def forward(self, x) :
    x = self.RESNET_LAYER_SEQ(x)
    x = x.view(x.shape[0], 32, -1)
    x = x.permute(2, 0, 1)

    x = self.SELF_ATTENSION_SKIP_LAYER_SEQ(x)
    
    x = self.FC_LAYER_SEQ(x)

    x = F.log_softmax(x, dim=-1)
    return x


  def training_step(self, batch, batch_idx) :
    CTC_LOSS = self.__forward_To_CTC_Loss(batch)
    self.log('train_ctc_loss', CTC_LOSS, on_epoch=True, prog_bar=True)
    self.log('train_accuracy', self.__accuracy(batch), on_epoch=True, prog_bar=True)
    return CTC_LOSS
  
  def validation_step(self, batch, batch_idx) :
    self.log('valid_accuracy', self.__accuracy(batch), on_epoch=True, prog_bar=True)
  
  def test_step(self, batch, batch_idx) :
    self.log('test_accuracy', self.__accuracy(batch), on_epoch=True, prog_bar=True)
  
  def predict_step(self, batch, batch_idx) :
    X, Y = batch
    return self.__pred_To_Letters(X)


  def configure_optimizers(self):
      return Adam(self.parameters(), lr=1e-4)


  # 주어진 배치에 대한 CTC 손실을 반환시킥 위해서
  def __forward_To_CTC_Loss(self, batch) :
    X, Y = batch
    PREDS = self(X)

    PREDS_SIZE = torch.IntTensor([PREDS.size(0)]*PREDS.size(1))
    TARGET_SIZE = torch.IntTensor([len(y_each) for y_each in Y])

    CTC_LOSS = nn.CTCLoss(blank=0)(PREDS, Y, PREDS_SIZE, TARGET_SIZE)
    return CTC_LOSS
  
  # 주어진 배치에 대한 정확도를 반환시키기 위해서
  def __accuracy(self, batch) :
    X, Y = batch
    Y_LETTERS = self.__y_To_Letters(Y)
    PRED_LETTERS = self.__pred_To_Letters(X)

    correct_count = 0
    for pred_index in range(len(PRED_LETTERS)) :
      if PRED_LETTERS[pred_index] == Y_LETTERS[pred_index] : correct_count += 1
    
    ACCURACY = correct_count/len(PRED_LETTERS)
    return ACCURACY
  
  # 신경망에서 예측된 백터를 문자열들로 반환시키기 위해서
  def __pred_To_Letters(self, x) :
    PREDS = self(x).transpose(1,0)
    PRED_ARGMAXS = torch.argmax(PREDS, dim=-1)

    output_pred_letters = []
    for pred_argmax in PRED_ARGMAXS :
      PRED_ARGMAX_PROCESSED = self.__process_Model_Predict(pred_argmax)
      PRED_LETTERS = self.__predict_To_Letters(PRED_ARGMAX_PROCESSED)
      output_pred_letters.append(PRED_LETTERS)
    return output_pred_letters

  # CTC 손실로 중복 예측된 라벨들의 중복을 제거시키고 정제하기 위해서
  def __process_Model_Predict(self, pred_argmax) :
    pred_letters = []
    prev_letter = pred_argmax[0].item()
    if prev_letter != 0 : pred_letters.append(prev_letter)

    for letter in pred_argmax :
      if letter.item() != 0 and letter.item() != prev_letter :
        pred_letters.append(letter.item())
      prev_letter = letter.item()
    return pred_letters
  
  # 예측된 코드를 문자열로 변환시키기 위해서
  def __predict_To_Letters(self, pred) :
    return "".join([self.REV_BOW[code] for code in pred])
  
  # 정답 백터를 문자열 리스트로 변환시키기 위해서
  def __y_To_Letters(self, y) :
    return ["".join([self.REV_BOW[code] for code in y_each]) for y_each in y]

### 4. 신경망 학습하기
-----

In [None]:
# 학습 진행상황을 모니터링하기 위해서
%load_ext tensorboard
%tensorboard --logdir lightning_logs/

In [None]:
# 신경망을 학습시키고, 체크포인트에 가장 정확도가 높은 모델을 저장시키기 위해서
from pytorch_lightning.callbacks import ModelCheckpoint

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
CHECK_POINT_CALLBACK = ModelCheckpoint(dirpath="./check_points", monitor="valid_accuracy", mode="max", filename="crnn-{epoch:02d}-{valid_accuracy:.2f}")
LOGGER = pl.loggers.TensorBoardLogger(name=f'CRNN_MODULE_LOGS', save_dir='lightning_logs')
TRAINER = pl.Trainer(max_epochs=1000, accelerator=DEVICE, callbacks=[CHECK_POINT_CALLBACK], logger=LOGGER)


CRNN_MODULE = CRNN_Module(bow=TRAIN_DATASET.BOW)
TRAINER.fit(CRNN_MODULE, train_dataloaders=TRAIN_LOADER, val_dataloaders=VALID_LOADER)

### 5. 모델 테스트하기
-----

In [None]:
# 훈련된 모델에 대한 Train, Valid, Test 데이터 로더 관련 정확도를 출력시키기 위해서
CRNN_MODULE_TEST = CRNN_Module.load_from_checkpoint("/content/Resnet_With_Attention_Complete_Train-0.9992_Valid-0.9841_Test-0.9870.ckpt", bow=TRAIN_DATASET.BOW)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TRAINER = pl.Trainer(accelerator=DEVICE)

TRAINER.test(CRNN_MODULE_TEST, dataloaders=TRAIN_LOADER)
TRAINER.test(CRNN_MODULE_TEST, dataloaders=VALID_LOADER)
TRAINER.test(CRNN_MODULE_TEST, dataloaders=TEST_LOADER)

### 6. 모델을 사용하는 사용자 인터페이스 생성하기
-----

In [None]:
from torch.utils.data import TensorDataset, DataLoader
import numpy as np

# 신경망을 통해서 캡차 데이터를 일관성있게 예측하기 위해서
class Predict_Captcha_Label :
  def __init__(self, model_path, bow) :
    self.MODEL = CRNN_Module.load_from_checkpoint(model_path, bow=bow)
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
    self.TRAINER = pl.Trainer(accelerator=DEVICE, enable_progress_bar=False, logger=False)

  def __preprocessing_Image(self, image) :
    USER_IMAGE = Image.fromarray(np.uint8(image)).convert("L").convert("RGB")
    USER_IMAGE = Compose([
      ToTensor(),
      Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
    ])(USER_IMAGE)
    return USER_IMAGE

  def __convert_Image_To_Data_Loader(self, images) :
    user_images = list(map(self.__preprocessing_Image, images))
    while len(user_images) != 8 :
      user_images.append(user_images[0])
    
    USER_IMAGES = torch.stack(user_images)
    USER_IMAGE_DATASET = TensorDataset(USER_IMAGES, torch.zeros(8))
    USER_IMAGE_LOADER = DataLoader(USER_IMAGE_DATASET, batch_size=8)
    return USER_IMAGE_LOADER
  
  # 단일 이미지에 대한 예측 결과를 반환시키기 위해서
  def predict_Captcha_Label(self, image) :
      DATA_LOADER = self.__convert_Image_To_Data_Loader([image])
      PRED = self.TRAINER.predict(self.MODEL, dataloaders=DATA_LOADER)[0][0]
      return PRED
  
  # 여러 이미지에 대한 예측 결과를 반환시키기 위해서
  def predict_Captcha_Labels(self, images) :
    DATA_LOADER = self.__convert_Image_To_Data_Loader(images)
    PREDS = self.TRAINER.predict(self.MODEL, dataloaders=DATA_LOADER)[0][:len(images)]
    return PREDS

In [None]:
# 이미지 분류를 인식시키기 위한 유저 인터페이스를 제공하기 위해서
import gradio as gr
from torch.utils.data import TensorDataset, DataLoader
import numpy as np

PREDICT_CAPTCHA_LABEL = Predict_Captcha_Label("/content/Resnet_With_Attention_Complete_Train-0.9992_Valid-0.9841_Test-0.9870.ckpt", bow=TRAIN_DATASET.BOW)
def cpatcha_Recognition(user_image):
  return PREDICT_CAPTCHA_LABEL.predict_Captcha_Label(user_image)

gr.Interface(fn=cpatcha_Recognition, inputs="image", outputs="text").launch(debug=True)