In [2]:
import os
import gc 
import numpy as np
import pandas as pd
from tqdm import tqdm
from random import shuffle, seed
from typing import Union, List, Tuple, Dict

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchmetrics import CharErrorRate
from torchvision import transforms, models

from PIL import Image
from IPython import display
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")


In [3]:
class config:
    TRAIN_IMAGES = "data/train/train"
    TEST_IMAGES = "data/test/test"
    TRAIN_LABELS = "data/train_labels.csv"

    INPUT_SIZE = [100, 300]
    IMAGENET_MEAN = [0.485, 0.456, 0.406] 
    IMAGENET_STD = [0.229, 0.224, 0.225]
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # torch.device("cpu") # 

    SEED = 42
    BATCH_SIZE = 4
    BLANK_LABEL = 0


seed(config.SEED)
np.random.seed(config.SEED)
torch.manual_seed(config.SEED)
torch.cuda.manual_seed(config.SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


In [4]:
class LabelConverter:
    def __init__(self, chars: str):
        self.chars = chars
        self.chars_dimension = len(chars)
        self.char2idx = {
            char.encode('utf-8'): idx for idx, char in zip(range(1, self.chars_dimension + 1), chars)
        }
        self.idx2char = {idx: char for char, idx in self.char2idx.items()}

    def encode(
        self, labels: Union[str, List, Tuple]
    ) -> Tuple[torch.IntTensor, torch.IntTensor]:
        if isinstance(labels, str):
            labels = [labels]

        length = [len(label) for label in labels]
        labels = "".join(labels)
        labels = [self.char2idx[char.encode('utf-8')] for char in labels]
        return torch.IntTensor(labels), torch.IntTensor(length)

    def decode(self, logit: torch.Tensor) -> str:
        index = logit.cpu().argmax(2)
        index = index.squeeze(1).numpy()
        chars = [self.idx2char[idx].decode('utf-8') if idx != 0 else "/%/" for idx in index]
        chars = "".join(chars)
        chars = chars.split("/%/")

        label = []
        for char in chars:
            if len(char) != 0:
                label.append(char[0])

        label = "".join(label)
        return label

In [5]:
target = pd.read_csv(config.TRAIN_LABELS)
chars = "".join(set("".join([str(i) for i in target.Expected.tolist()])))

label_converter = LabelConverter(chars)

In [6]:
class OCR_Dataset_Train(Dataset):
    def __init__(self, path_images: str, path_labels: str, train: bool = True):
        self.path_images = path_images
        self.path_labels = path_labels
        self.images = []
        self.labels = []
        files = os.listdir(path_images)
        shuffle(files)

        target = pd.read_csv(self.path_labels)
        for image in files:
            label = str(target[target["Id"] == image]["Expected"].values[0])
            self.images.append(os.path.join(path_images, image))
            self.labels.append(label)

        if train:  # 80%
            self.images = self.images[: int(0.8 * len(self.images))]
            self.labels = self.labels[: int(0.8 * len(self.labels))]

        else:  # 20%
            self.images = self.images[int(0.8 * len(self.images)) :]
            self.labels = self.labels[int(0.8 * len(self.labels)) :]

    def __getitem__(self, index):
        path_img, label = self.images[index], self.labels[index]
        image = Image.open(path_img)

        transform = transforms.Compose(
            [
                lambda image: image.convert("RGB"),
                transforms.ToTensor(),
                transforms.Resize((config.INPUT_SIZE[0], config.INPUT_SIZE[1])), 
                transforms.Normalize(config.IMAGENET_MEAN, config.IMAGENET_STD)
            ]
        )
        img = transform(image)

        return img, label, path_img

    def __len__(self):
        return len(self.images)

In [7]:
train_data = OCR_Dataset_Train(config.TRAIN_IMAGES, config.TRAIN_LABELS, train=True)
test_data = OCR_Dataset_Train(config.TRAIN_IMAGES, config.TRAIN_LABELS, train=False)

train_loader = DataLoader(train_data, batch_size=config.BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_data, batch_size=config.BATCH_SIZE, shuffle=False)

In [8]:
len(train_data), len(test_data)

(220800, 55200)

In [15]:
class FeatureExtractor(nn.Module):
    def __init__(self, input_size, output_len):
        super(FeatureExtractor, self).__init__()

        h, w = input_size
        resnet = getattr(models, 'resnet18')('ResNet18_Weights.DEFAULT')
        self.cnn = nn.Sequential(*list(resnet.children())[:-2])

        for name, param in self.cnn.named_parameters():
                param.requires_grad = False

        with torch.no_grad():
            latent_space_width = self.cnn(torch.randn(size=(10, 3, h, w))).shape[3]
        
        self.pool = nn.AdaptiveAvgPool2d((1, latent_space_width))  
        self.proj = nn.Conv2d(latent_space_width, output_len, kernel_size=1)
        self.num_output_features = self.cnn[-1][-1].bn2.num_features    
   
    def forward(self, x):
        features = self.cnn(x)
        features = self.pool(features)
        features = features.permute(0, 3, 2, 1).contiguous()
        features = self.proj(features)
        features = features.permute(0, 2, 3, 1).contiguous()
        return features
    

class SequencePredictor(nn.Module):
    def __init__(
        self, 
        input_size, 
        hidden_size, 
        num_layers, 
        num_classes, 
        dropout=0.3, 
        bidirectional=True
        ):
        super(SequencePredictor, self).__init__()
        
        self.num_classes = num_classes        
        self.rnn = nn.GRU(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            bidirectional=bidirectional
            )
        fc_in = hidden_size if not bidirectional else 2 * hidden_size
        self.fc = nn.Linear(in_features=fc_in, out_features=num_classes)
    
    def forward(self, x):
        x = x.squeeze(1)
        x = x.permute(2, 0, 1)
        batch_size = x.size(1)
        num_directions = 2 if self.rnn.bidirectional else 1
        h_0 = torch.zeros(self.rnn.num_layers * num_directions, batch_size, self.rnn.hidden_size)
        h_0 = h_0.to(x.device)
        x, _ = self.rnn(x, h_0)
        x = self.fc(x)
        return x


class OCR_Model(nn.Module):
    def __init__(
        self, 
        cnn_input_size=config.INPUT_SIZE, 
        cnn_output_len=256, 
        num_classes=label_converter.chars_dimension,
        rnn_hidden_size=128, 
        rnn_num_layers=2, 
        rnn_dropout=0.3, 
        rnn_bidirectional=True
        ):
        super(OCR_Model, self).__init__()

        self.features_extractor = FeatureExtractor(input_size=cnn_input_size, output_len=cnn_output_len)
        self.sequence_predictor = SequencePredictor(
            input_size=self.features_extractor.num_output_features,
            hidden_size=rnn_hidden_size, 
            num_layers=rnn_num_layers,
            num_classes=num_classes, 
            dropout=rnn_dropout,
            bidirectional=rnn_bidirectional
            )

    def forward(self, x):
        features = self.features_extractor(x)
        sequence = self.sequence_predictor(features)
        return sequence



In [16]:
model = OCR_Model().to(device=config.DEVICE)
criterion = nn.CTCLoss(blank=config.BLANK_LABEL, zero_infinity=True)

lr = 0.002
weight_decay = 1e-5
momentum = 0.9
optimizer = torch.optim.SGD(model.parameters(), lr=lr, nesterov=True, weight_decay=weight_decay, momentum=momentum)

In [17]:
def plot_loss(epoch: int, train_loss: List, test_loss: List):
    display.clear_output(True)
    fig, ax = plt.subplots(1, 2, figsize=(14, 6))
    ax[0].plot(train_loss)
    ax[1].plot(test_loss)

    train_title = f"Epoch:{epoch} // Train Loss:{np.mean(train_loss[-100:]):.5f}"
    test_title = f"Epoch:{epoch} // Test Loss:{np.mean(test_loss[-100:]):.5f}"
    ax[0].set_title(train_title)
    ax[1].set_title(test_title)

    plt.show()

In [19]:
class Recognition:
    def __init__(
        self,
        model: OCR_Model,
        optimizer: torch.optim.SGD,
        criterion: nn.CTCLoss,
        label_converter: LabelConverter,
        epochs: int = 50,
    ):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        self.label_converter = label_converter
        self.epochs = epochs
        self.device = config.DEVICE

    def calculate_loss(self, output: torch.Tensor, target: Tuple):
        c, n = output.size(0), output.size(1)
        output_lengths = torch.full(size=(n,), fill_value=c, dtype=torch.int32)
        target_encoded, target_lengths = self.label_converter.encode(target)
        loss = self.criterion(output, target_encoded, output_lengths, target_lengths)
        return loss

    def fit(
        self, train_loader: DataLoader, test_loader: DataLoader
    ) -> Tuple[Dict, List[float], List[float]]:
        train_history_loss = []
        test_history_loss = []
        for epoch in tqdm(range(self.epochs), total=self.epochs):
            gc.collect()
            torch.cuda.empty_cache()

            self.model.train()
            for image, target, _ in train_loader:
                self.optimizer.zero_grad()

                image = image.to(device=self.device)
                output = self.model(image)
        
                train_loss = self.calculate_loss(output, target)
                if np.isnan(train_loss.detach().cpu().numpy()):
                    continue
                train_history_loss.append(train_loss.detach().cpu().numpy())
                train_loss.backward()

                nn.utils.clip_grad_norm_(self.model.parameters(), 1)

                self.optimizer.step()

            results, test_history_loss = self.evaluate(test_loader, test_history_loss)
            plot_loss(epoch + 1, train_history_loss, test_history_loss)

            saving = {
                'state_dict': self.model.state_dict(),
                'optimizer': self.optimizer.state_dict(),
                'char2idx': label_converter.char2idx,
                'idx2char': label_converter.idx2char,
                }
            torch.save(saving, 'model.pth')
            print(results)

        return results, train_history_loss, test_history_loss

    def evaluate(
        self, test_loader: DataLoader, test_history_loss: List[float]
    ) -> Tuple[Dict, List[float]]:
        outputs = {"predict": [], "target": [], "image": []}

        self.model.eval()
        with torch.no_grad():
            for image, target, path_image in test_loader:
                image = image.to(device=self.device)
                output = self.model(image)

                test_loss = self.calculate_loss(output, target)
                if np.isnan(test_loss.detach().cpu().numpy()):
                    continue
                test_history_loss.append(test_loss.detach().cpu().numpy())

                outputs["predict"].append(output.detach().cpu())
                outputs["target"].append(target)
                outputs["image"].append(path_image)

        return outputs, test_history_loss

In [21]:
engine = Recognition(model, optimizer, criterion, label_converter, 30)
results, train_loss, test_loss = engine.fit(train_loader, test_loader)

In [23]:
df_results = []
for predicts, targets, images in zip(
    results["predict"], results["target"], results["image"]
):
    for idx in range(len(images)):
        output = label_converter.decode(predicts[:, idx, :].unsqueeze(1))
        df_results.append([output, targets[idx], images[idx]])
df_results = pd.DataFrame(df_results, columns=["predict", "target", "image"])

In [12]:
class OCR_Dataset_Test(Dataset):
    def __init__(self, path_images: str):
        self.path_images = path_images
        self.images = []
        files = os.listdir(path_images)

        for image in files:
            self.images.append(os.path.join(path_images, image))


    def __getitem__(self, index):
        path_img = self.images[index]
        image = Image.open(path_img)

        transform = transforms.Compose(
                [
                    lambda image: image.convert("RGB"),
                    transforms.ToTensor(),
                    transforms.Resize((config.INPUT_SIZE[0], config.INPUT_SIZE[1])),
                    transforms.Normalize(config.IMAGENET_MEAN, config.IMAGENET_STD)
                ]
        )
        img = transform(image)

        return img, path_img

    def __len__(self):
        return len(self.images)

In [13]:
data = OCR_Dataset_Test(config.TEST_IMAGES)
loader = DataLoader(data, batch_size=config.BATCH_SIZE, shuffle=False)

In [23]:
checkpoint = torch.load("model.pth", map_location=torch.device(config.DEVICE))
model = OCR_Model().to(device=config.DEVICE)
model.load_state_dict(checkpoint["state_dict"])

<All keys matched successfully>

In [22]:
df_results = []
model.eval()
with torch.no_grad():
    for image, path_image in loader:
        image = image.to(device=config.DEVICE)
        output = model(image)
        output = output.detach().cpu()

        for idx in range(len(path_image)):
            df_results.append([path_image[idx], label_converter.decode(output[:, idx, :].unsqueeze(1))])

In [65]:
df_results = pd.DataFrame(df_results, columns=["Path", "Predicted"])
df_results["Id"] = df_results["Path"].apply(lambda x: x.split("\\")[-1])
df_results["RN"] = df_results["Id"].apply(lambda x: int(x.split(".jpg")[0]))
df_results.sort_values("RN", inplace=True)
df_results.reset_index(inplace=True, drop=True)

In [66]:
predicts = df_results[["Predicted"]]
df_results.drop(["Path", "RN", "Predicted"], axis=1, inplace=True)
df_results = pd.concat([df_results, predicts], axis=1)

In [68]:
df_results.to_csv("result/submission.csv", index=False)