In [1]:
# code modified based on https://github.com/MarkPotanin/DigitalPeter/blob/main/baseline.ipynb
# used PyTorch instead of TensorFlow
import os
from os.path import join
from collections import Counter
import numpy as np
import matplotlib.pyplot as plt
import cv2
import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
# dataset downloaded from https://drive.google.com/file/d/1Qki21iEcg_iwMo3kWuaHi5AlxxpLKpof/view
image_dir = 'peter/images'
trans_dir = 'peter/words'
english = set(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'm', 'n', 'o', 'p', 'r', 's', 't', 'u', 'w'])

In [3]:
def text_to_labels(text, letters):
    return [letters.index(c) for c in text]

def process_texts(image_dir, trans_dir):
    lens = []
    lines = []
    names = []
    letters = ''
    all_files = set(os.listdir(trans_dir))
    for filename in os.listdir(image_dir):
        if filename.endswith('.jpg'):
            name = filename[:-4]
            txt_filepath = join(trans_dir, name + '.txt')
            if name + '.txt' not in all_files:
                continue
            try:
                with open(txt_filepath, 'r', encoding='utf-8') as file:
                    data = file.read().strip()
                if not data or set(data).intersection(english):
                    continue
                lines.append(data)
                names.append(filename)
                lens.append(len(data))
                letters += data
            except Exception as e:
                print(f"Error reading {txt_filepath}: {e}")
                continue
    print(f'Max string length: {max(Counter(lens).keys())}')
    return names, lines, Counter(letters)

names, lines, cnt = process_texts(image_dir, trans_dir)
letters = sorted(list(set(cnt.keys())))
print('Unique characters in train:', ' '.join(letters))

# reduced resized image size from (128, 1024) to (64, 512)
def process_image(img):
    w, h, _ = img.shape
    new_w = 64
    new_h = int(h * (new_w / w))
    img = cv2.resize(img, (new_h, new_w))
    w, h, _ = img.shape
    img = img.astype('float32')
    if w < 64:
        add_zeros = np.full((64-w, h, 3), 255)
        img = np.concatenate((img, add_zeros))
    if h < 512:
        add_zeros = np.full((w, 512-h, 3), 255)
        img = np.concatenate((img, add_zeros), axis=1)
    if h > 512 or w > 64:
        img = cv2.resize(img, (512, 64))
    img = cv2.subtract(255, img) / 255
    return img

def generate_data(lines, names, image_dir, letters):
    data_images = []
    data_labels = []
    data_input_length = []
    data_label_length = []
    data_original_text = []
    max_label_len = 0
    for line, name in tqdm.tqdm(zip(lines, names), total=len(names)):
        try:
            img = cv2.imread(join(image_dir, name))
            if img is None:
                print(f"Failed to load image: {name}")
                continue
            img = process_image(img)
            label = text_to_labels(line, letters)
            data_images.append(img)
            data_labels.append(label)
            data_input_length.append(255)
            data_label_length.append(len(line))
            data_original_text.append(line)
            max_label_len = max(max_label_len, len(line))
        except Exception as e:
            print(f"Error processing {name}: {e}")
            continue
    return data_images, data_labels, data_input_length, data_label_length, data_original_text, max_label_len

Max string length: 71
Unique characters in train:   ( ) + / 0 1 2 3 4 5 6 7 8 9 [ ] i k l | × ǂ а б в г д е ж з и й к л м н о п р с т у ф х ц ч ш щ ъ ы ь э ю я ѣ – ⊕ ⊗


In [4]:
lines_train, names_train = [], []
lines_val, names_val = [], []

for num, (line, name) in enumerate(zip(lines, names)):
    if num % 15 == 0:
        lines_val.append(line)
        names_val.append(name)
    else:
        lines_train.append(line)
        names_train.append(name)

In [5]:
train_images, train_labels, train_input_length, train_label_length, train_original_text, train_max_label_len = generate_data(lines_train, names_train, image_dir, letters)
val_images, val_labels, val_input_length, val_label_length, val_original_text, val_max_label_len = generate_data(lines_val, names_val, image_dir, letters)

100%|██████████████████████████████████████████████████████████████████████████████| 5705/5705 [01:13<00:00, 77.14it/s]
100%|████████████████████████████████████████████████████████████████████████████████| 408/408 [00:05<00:00, 73.23it/s]


In [6]:
max_label_len = max(train_max_label_len, val_max_label_len)
print(f"Max label length: {max_label_len}")

Max label length: 71


In [7]:
def pad_sequences(labels, maxlen, value):
    return np.array([label + [value] * (maxlen - len(label)) for label in labels])

In [8]:
train_padded_label = pad_sequences(train_labels, max_label_len, len(letters))
val_padded_label = pad_sequences(val_labels, max_label_len, len(letters))

In [9]:
train_images = np.array(train_images)
val_images = np.array(val_images)
train_input_length = np.array(train_input_length)
val_input_length = np.array(val_input_length)
train_label_length = np.array(train_label_length)
val_label_length = np.array(val_label_length)

In [10]:
class CRNNDataset(Dataset):
    def __init__(self, images, labels, input_lengths, label_lengths):
        self.images = images
        self.labels = labels
        self.input_lengths = input_lengths
        self.label_lengths = label_lengths

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx].transpose(2, 0, 1)
        return (
            torch.FloatTensor(image),
            torch.LongTensor(self.labels[idx]),
            torch.LongTensor([self.input_lengths[idx]]),
            torch.LongTensor([self.label_lengths[idx]])
        )

In [11]:
train_dataset = CRNNDataset(train_images, train_padded_label, train_input_length, train_label_length)
val_dataset = CRNNDataset(val_images, val_padded_label, val_input_length, val_label_length)
train_loader = DataLoader(train_dataset, batch_size=60, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=60, shuffle=False)

In [12]:
# reduced number of convolutional layers from 7 to 6
# reduced number of units in each layer by half
class CRNN(nn.Module):
    def __init__(self, num_chars):
        super(CRNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool1 = nn.MaxPool2d((2, 2), stride=2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool2 = nn.MaxPool2d((2, 2), stride=2)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        self.pool3 = nn.MaxPool2d((4, 1), padding=(2, 0))
        self.conv4 = nn.Conv2d(128, 256, 3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.conv5 = nn.Conv2d(256, 256, 3, padding=1)
        self.bn5 = nn.BatchNorm2d(256)
        self.pool5 = nn.MaxPool2d((4, 1), padding=(2, 0))
        self.conv6 = nn.Conv2d(256, 256, 2)
        self.gru1 = nn.GRU(256, 128, bidirectional=True, batch_first=True, dropout=0.2)
        self.gru2 = nn.GRU(256, 128, bidirectional=True, batch_first=True, dropout=0.2)
        self.fc = nn.Linear(256, num_chars + 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = F.relu(self.conv4(x))
        x = self.bn4(x)
        x = F.relu(self.conv5(x))
        x = self.bn5(x)
        x = self.pool5(x)
        x = F.relu(self.conv6(x))
        x = x.squeeze(2)
        x = x.permute(0, 2, 1)
        x, _ = self.gru1(x)
        x, _ = self.gru2(x)
        x = self.fc(x)
        return x

In [13]:
def decode_batch(preds, letters):
    preds = F.softmax(preds, dim=2)
    preds = torch.argmax(preds, dim=2)
    batch_size = preds.size(0)
    decoded = []
    for i in range(batch_size):
        seq = []
        prev = -1
        for t in range(preds.size(1)):
            curr = preds[i, t].item()
            if curr != prev and curr != len(letters):
                seq.append(curr)
            prev = curr
        decoded.append([c for c in seq if c != -1])
    return decoded

In [14]:
class EarlyStopping:
    def __init__(self, patience=20, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.best_loss = float('inf')
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
                if self.verbose:
                    print("Early stopping triggered")

class ReduceLROnPlateau:
    def __init__(self, optimizer, factor=0.7, patience=5, min_lr=1e-5):
        self.optimizer = optimizer
        self.factor = factor
        self.patience = patience
        self.min_lr = min_lr
        self.best_loss = float('inf')
        self.counter = 0

    def __call__(self, val_loss):
        if val_loss < self.best_loss:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.counter = 0
                for param_group in self.optimizer.param_groups:
                    param_group['lr'] = max(param_group['lr'] * self.factor, self.min_lr)
                print(f"Reduced learning rate to {self.optimizer.param_groups[0]['lr']}")

class ModelCheckpoint:
    def __init__(self, filepath, verbose=1):
        self.filepath = filepath
        self.verbose = verbose
        self.best_acc = -float('inf')

    def __call__(self, val_acc, model):
        if val_acc > self.best_acc:
            self.best_acc = val_acc
            torch.save(model.state_dict(), self.filepath)
            if self.verbose:
                print(f"Saved model to {self.filepath} with val_acc: {val_acc:.4f}")

In [15]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CRNN(len(letters)).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
ctc_loss = nn.CTCLoss(blank=len(letters), zero_infinity=True)

filepath = "model.pth"
es = EarlyStopping(patience=20, verbose=True)
reduce_lr = ReduceLROnPlateau(optimizer, factor=0.7, patience=5, min_lr=1e-5)
checkpoint = ModelCheckpoint(filepath, verbose=1)

# reduced maximum number of epochs from 300 to 50
epochs = 50
history = {'loss': [], 'val_loss': [], 'acc': [], 'val_acc': []}



In [16]:
for epoch in range(epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    for batch in train_loader:
        images, labels, input_lengths, label_lengths = [x.to(device) for x in batch]
        optimizer.zero_grad()
        outputs = model(images)
        outputs = F.log_softmax(outputs, dim=2)
        T = outputs.size(1)
        batch_size = images.size(0)
        input_lengths_tensor = torch.full((batch_size,), T, dtype=torch.long, device=device)
        loss = ctc_loss(outputs.permute(1, 0, 2), labels, input_lengths_tensor, label_lengths)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        preds = decode_batch(outputs, letters)
        for pred, true_label, true_len in zip(preds, labels.cpu().numpy(), label_lengths.cpu().numpy()):
            true_len = true_len[0]
            true_label = true_label[:true_len].tolist()
            pred_text = ''.join(letters[c] for c in pred if c < len(letters))
            true_text = ''.join(letters[c] for c in true_label if c < len(letters))
            train_correct += 1 if pred_text == true_text else 0
            train_total += 1

    train_loss /= len(train_loader)
    train_acc = train_correct / train_total if train_total > 0 else 0

    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for batch in val_loader:
            images, labels, input_lengths, label_lengths = [x.to(device) for x in batch]
            outputs = model(images)
            outputs = F.log_softmax(outputs, dim=2)
            T = outputs.size(1)
            batch_size = images.size(0)
            input_lengths_tensor = torch.full((batch_size,), T, dtype=torch.long, device=device)
            loss = ctc_loss(outputs.permute(1, 0, 2), labels, input_lengths_tensor, label_lengths)
            val_loss += loss.item()
            
            preds = decode_batch(outputs, letters)
            for pred, true_label, true_len in zip(preds, labels.cpu().numpy(), label_lengths.cpu().numpy()):
                true_len = true_len[0]
                true_label = true_label[:true_len].tolist()
                pred_text = ''.join(letters[c] for c in pred if c < len(letters))
                true_text = ''.join(letters[c] for c in true_label if c < len(letters))
                val_correct += 1 if pred_text == true_text else 0
                val_total += 1

    val_loss /= len(val_loader)
    val_acc = val_correct / val_total if val_total > 0 else 0

    history['loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['acc'].append(train_acc)
    history['val_acc'].append(val_acc)

    print(f"Epoch {epoch+1:05d}: loss: {train_loss:.4f} - acc: {train_acc:.4f} - val_loss: {val_loss:.4f} - val_acc: {val_acc:.4f} - lr: {optimizer.param_groups[0]['lr']:.6f}")

    checkpoint(val_acc, model)
    es(val_loss)
    reduce_lr(val_loss)
    if es.early_stop:
        print("Stopping training due to early stopping")
        break

Epoch 00001: loss: 4.3199 - acc: 0.0000 - val_loss: 3.3967 - val_acc: 0.0000 - lr: 0.001000
Saved model to model.pth with val_acc: 0.0000
Epoch 00002: loss: 3.3724 - acc: 0.0000 - val_loss: 3.3648 - val_acc: 0.0000 - lr: 0.001000
Epoch 00003: loss: 3.3301 - acc: 0.0000 - val_loss: 3.3772 - val_acc: 0.0000 - lr: 0.001000
Epoch 00004: loss: 3.3112 - acc: 0.0000 - val_loss: 3.3545 - val_acc: 0.0000 - lr: 0.001000
Epoch 00005: loss: 3.2892 - acc: 0.0000 - val_loss: 3.3391 - val_acc: 0.0000 - lr: 0.001000
Epoch 00006: loss: 3.2557 - acc: 0.0000 - val_loss: 3.3507 - val_acc: 0.0000 - lr: 0.001000
Epoch 00007: loss: 3.1762 - acc: 0.0000 - val_loss: 3.2782 - val_acc: 0.0000 - lr: 0.001000
Epoch 00008: loss: 3.0515 - acc: 0.0000 - val_loss: 3.2096 - val_acc: 0.0000 - lr: 0.001000
Epoch 00009: loss: 2.7765 - acc: 0.0000 - val_loss: 2.7754 - val_acc: 0.0000 - lr: 0.001000
Epoch 00010: loss: 1.9831 - acc: 0.0025 - val_loss: 2.0157 - val_acc: 0.0025 - lr: 0.001000
Saved model to model.pth with val_

In [17]:
model.load_state_dict(torch.load(filepath))
model.eval()

CRNN(
  (conv1): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool1): MaxPool2d(kernel_size=(2, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool2): MaxPool2d(kernel_size=(2, 2), stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool3): MaxPool2d(kernel_size=(4, 1), stride=(4, 1), padding=(2, 0), dilation=1, ceil_mode=False)
  (conv4): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn4): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv5): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn5): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool5): MaxPool2d(kernel_size=(4, 1), stride=(4, 1), padding=(2, 0), dilation=1, ceil_mode=False)
  (conv6): Conv2d(256, 256, kernel_size=(

In [18]:
# used instead of editdistance
def levenshtein_distance(str1, str2):
    dp = [[0] * (len(str2) + 1) for _ in range(len(str1) + 1)]
    
    for i in range(len(str1) + 1):
        dp[i][0] = i
    for j in range(len(str2) + 1):
        dp[0][j] = j
    
    for i in range(1, len(str1) + 1):
        for j in range(1, len(str2) + 1):
            cost = 0 if str1[i-1] == str2[j-1] else 1
            dp[i][j] = min(
                dp[i-1][j] + 1,
                dp[i][j-1] + 1,
                dp[i-1][j-1] + cost
            )
    return dp[len(str1)][len(str2)]

In [19]:
num_char_err = 0
num_char_total = 0
num_string_ok = 0
num_string_total = 0
word_eds = []
word_true_lens = []
print('Ground truth -> Recognized')

with torch.no_grad():
    for i in range(len(val_images)):
        image = torch.FloatTensor(val_images[i].transpose(2, 0, 1)).unsqueeze(0).to(device)
        output = model(image)
        pred = decode_batch(output, letters)[0]
        pred_text = ''.join(letters[c] for c in pred if c < len(letters))
        true_text = val_original_text[i]
        dist = levenshtein_distance(pred_text, true_text)
        num_char_err += dist
        num_char_total += len(true_text)
        num_string_ok += 1 if pred_text == true_text else 0
        num_string_total += 1
        pred_words = pred_text.split()
        true_words = true_text.split()
        word_eds.append(levenshtein_distance(pred_words, true_words))
        word_true_lens.append(len(true_words))
        print(f"[{'OK' if dist==0 else f'ERR:{dist}'}] \"{true_text}\" -> \"{pred_text}\"")

Ground truth -> Recognized
[ERR:7] "iз питербурха въ 14 д апъ петръ" -> "iз питербурхавъ 72дипу петръ"
[ERR:3] "с полною мочью к фельтъмаршалу" -> "сполною мочью к фелтъ маршалу"
[ERR:6] "того + обѣщаем + за сие" -> "тогоно вѣшаем + засие"
[ERR:1] "рала порутчика рансова таким обра" -> "рала порутчикарансова таким обра"
[ERR:2] "петръ" -> "готръ"
[ERR:4] "ношениям к сенату к москвѣ" -> "не ения ксенату к москвѣ"
[ERR:2] "тѣ мѣста не отдават никому" -> "тѣмѣста не от дават никому"
[ERR:3] "с корабля выборха" -> "скорабля выборлн"
[ERR:1] "вят тавары + тогда i пошлину брат" -> "вят тавары i тогда i пошлину брат"
[ERR:1] "дабы мы здѣсь кредиту не" -> "дабы мыздѣсь кредиту не"
[ERR:6] "всѣ руже положа в пълѣнъ отдалис а" -> "вбѣруж е положа впълѣнъ отдали ал"
[ERR:4] "те время до въремени забывая бога" -> "те время довѣремени за бываябога"
[ERR:13] "выдайте на оплату долгоф к юрю трубецко" -> "въ дайте нао  тудолгоф к ретосецко"
[ERR:2] "на них въ сенат" -> "наних въсенат"
[ERR:6] "без въс

In [20]:
char_error_rate = num_char_err / num_char_total
word_error_rate = sum(word_eds) / sum(word_true_lens)
string_accuracy = num_string_ok / num_string_total
print(f'Character error rate: {char_error_rate*100:.2f}%. Word error rate: {word_error_rate*100:.2f}%. String accuracy: {string_accuracy*100:.2f}%.')

Character error rate: 16.32%. Word error rate: 66.51%. String accuracy: 10.54%.
