In [1]:
import os 

In [4]:
!pip install torch torchvision numpy pillow h5py opencv-python xmltodict



Collecting xmltodict
  Downloading xmltodict-0.14.2-py2.py3-none-any.whl.metadata (8.0 kB)
Downloading xmltodict-0.14.2-py2.py3-none-any.whl (10.0 kB)
Installing collected packages: xmltodict
Successfully installed xmltodict-0.14.2


In [78]:
import os
import glob
import torch
import xml.etree.ElementTree as ET
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
import torch.nn.functional as F

from pathlib import Path
import numpy as np
import math
from itertools import groupby
import h5py
import numpy as np
import unicodedata
import cv2
import torch
from torch import nn
from torchvision.models import resnet50, resnet101
from torch.autograd import Variable
import torchvision

from torch.utils.data import Dataset
import time



In [79]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=128):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class OCR(nn.Module):

    def __init__(self, vocab_len, hidden_dim, nheads,
                 num_encoder_layers, num_decoder_layers):
        super().__init__()

        # create ResNet-101 backbone
        self.backbone = resnet101()
        del self.backbone.fc

        # create conversion layer
        self.conv = nn.Conv2d(2048, hidden_dim, 1)

        # create a default PyTorch transformer
        self.transformer = nn.Transformer(
            hidden_dim, nheads, num_encoder_layers, num_decoder_layers)

        # prediction heads with length of vocab
        # DETR used basic 3 layer MLP for output
        self.vocab = nn.Linear(hidden_dim,vocab_len)

        # output positional encodings (object queries)
        self.decoder = nn.Embedding(vocab_len, hidden_dim)
        self.query_pos = PositionalEncoding(hidden_dim, .2)

        # spatial positional encodings, sine positional encoding can be used.
        # Detr baseline uses sine positional encoding.
        self.row_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        self.col_embed = nn.Parameter(torch.rand(50, hidden_dim // 2))
        self.trg_mask = None
  
    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz), 1)
        mask = mask.masked_fill(mask==1, float('-inf'))
        return mask

    def get_feature(self,x):
        x = self.backbone.conv1(x)
        x = self.backbone.bn1(x)   
        x = self.backbone.relu(x)
        x = self.backbone.maxpool(x)

        x = self.backbone.layer1(x)
        x = self.backbone.layer2(x)
        x = self.backbone.layer3(x)
        x = self.backbone.layer4(x)
        return x


    def make_len_mask(self, inp):
        return (inp == 0).transpose(0, 1)


    def forward(self, inputs, trg):
        # propagate inputs through ResNet-101 up to avg-pool layer
        x = self.get_feature(inputs)

        # convert from 2048 to 256 feature planes for the transformer
        h = self.conv(x)

        # construct positional encodings
        bs,_,H, W = h.shape
        pos = torch.cat([
            self.col_embed[:W].unsqueeze(0).repeat(H, 1, 1),
            self.row_embed[:H].unsqueeze(1).repeat(1, W, 1),
        ], dim=-1).flatten(0, 1).unsqueeze(1)

        # generating subsequent mask for target
        if self.trg_mask is None or self.trg_mask.size(0) != len(trg):
            self.trg_mask = self.generate_square_subsequent_mask(trg.shape[1]).to(trg.device)

        # Padding mask
        trg_pad_mask = self.make_len_mask(trg)

        # Getting postional encoding for target
        trg = self.decoder(trg)
        trg = self.query_pos(trg)
        
        output = self.transformer(pos + 0.1 * h.flatten(2).permute(2, 0, 1), trg.permute(1,0,2), tgt_mask=self.trg_mask, 
                                  tgt_key_padding_mask=trg_pad_mask.permute(1,0))

        return self.vocab(output.transpose(0,1))


def make_model(vocab_len, hidden_dim=256, nheads=4,
                 num_encoder_layers=4, num_decoder_layers=4):
    
    return OCR(vocab_len, hidden_dim, nheads,
                 num_encoder_layers, num_decoder_layers)

In [80]:
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms

class PromitoLipiDataset(Dataset):
    def __init__(self, img_dir, annotation_file, tokenizer, transform=None):
        self.img_dir = img_dir
        self.annotations = load_annotations(annotation_file)  # Load annotations
        self.image_files = list(self.annotations.keys())  # Get image filenames
        self.tokenizer = tokenizer  # Use the Tokenizer
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image_name = self.image_files[idx]
        label_text = self.annotations[image_name]  # Get Bangla text label

        # Load Image
        img_path = os.path.join(self.img_dir, image_name)
        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        # Convert text to token indices
        y_train = self.tokenizer.encode(label_text)

        # Pad sequence to max length
        y_train = np.pad(y_train, (0, self.tokenizer.maxlen - len(y_train)), mode="constant")

        return img, torch.tensor(y_train, dtype=torch.long)



# Image Transformations for ResNet-101
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


In [81]:
class Tokenizer:
    """Handles text-token conversion for Bangla OCR."""

    def __init__(self, class_mapping, max_text_length=128):
        self.PAD_TK, self.UNK_TK, self.SOS_TK, self.EOS_TK = "¶", "¤", "SOS", "EOS"
        
        # Convert class_mapping to a list of characters
        self.chars = [self.PAD_TK, self.UNK_TK, self.SOS_TK, self.EOS_TK] + list(class_mapping.values())
        self.vocab_size = len(self.chars)
        self.maxlen = max_text_length

        # Create character-to-index and index-to-character mappings
        self.char_to_idx = {c: i for i, c in enumerate(self.chars)}
        self.idx_to_char = {i: c for c, i in self.char_to_idx.items()}

    def encode(self, text):
        """Encodes Bangla text into token indices."""
        text = ['SOS'] + list(text) + ['EOS']
        encoded = [self.char_to_idx.get(c, self.char_to_idx["¤"]) for c in text]
        return np.array(encoded)

    def decode(self, tokens):
        """Decodes token indices back to Bangla text."""
        text = "".join([self.idx_to_char.get(i, "") for i in tokens])
        return text.replace("SOS", "").replace("EOS", "").replace("¶", "").replace("¤", "")



# # Bangla Charset (Letters, Digits, and Common Symbols)
# bangla_charset = "অইঈউঊঋএঐওঔকখগঘঙচছজঝঞটঠডঢণতথদধনপফবভমযরলশষসহড়ঢ়য়ৎঃংঁ" \
#                  "০১২৩৪৫৬৭৮৯" \
#                  "ািীেু্র্যক্ষন্তত্রঙ্গস্থস্বক্তস্তন্দচ্ছদ্ধন্ত্রত্তষ্টন্নল্পম্পূৃৈৌ।"

class_mapping = {
    0: 'blank', 1: 'অ', 2: 'ই', 3: 'ঈ', 4: 'উ', 5: 'ঊ', 6: 'ঋ', 7: 'এ', 8: 'ঐ', 9: 'ও', 10: 'ঔ',
    11: 'ক', 12: 'খ', 13: 'গ', 14: 'ঘ', 15: 'ঙ', 16: 'চ', 17: 'ছ', 18: 'জ', 19: 'ঝ', 20: 'ঞ', 21: 'ট',
    22: 'ঠ', 23: 'ড', 24: 'ঢ', 25: 'ণ', 26: 'ত', 27: 'থ', 28: 'দ', 29: 'ধ', 30: 'ন', 31: 'প', 32: 'ফ',
    33: 'ব', 34: 'ভ', 35: 'ম', 36: 'য', 37: 'র', 38: 'ল', 39: 'শ', 40: 'ষ', 41: 'স', 42: 'হ', 43: 'ড়',
    44: 'ঢ়', 45: 'য়', 46: 'ৎ', 47: 'ঃ', 48: 'ং', 49: 'ঁ', 50: '০', 51: '১', 52: '২', 53: '৩', 54: '৪',
    55: '৫', 56: '৬', 57: '৭', 58: '৮', 59: '৯', 60: 'া', 61: 'ি', 62: 'ী', 63: 'ে', 64: 'ু', 65: 'faka',
    66: '্র', 67: '্য', 68: 'ক্ষ', 69: 'ন্ত', 70: 'ত্র', 71: 'ঙ্গ', 72: 'স্থ', 73: 'স্ব', 74: 'ক্ত',
    75: 'স্ত', 76: 'ন্দ', 77: 'চ্ছ', 78: 'দ্ধ', 79: 'ন্ত্র', 80: 'ফাকা', 81: 'ত্ত', 82: 'ষ্ট', 83: 'ন্ন',
    84: 'ল্প', 85: 'ম্প', 86: 'faka', 87: 'ূ', 88: 'ৃ', 89: 'ৈ', 90: 'faka', 91: 'ৌ', 92: '।'
}

# Initialize Tokenizer with Bangla characters
tokenizer = Tokenizer(class_mapping)



In [82]:
# Encoding Example
text = "বাংলা ভাষা"
encoded_text = tokenizer.encode(text)
print("Encoded:", encoded_text)

# Decoding Example
decoded_text = tokenizer.decode(encoded_text)
print("Decoded:", decoded_text)


Encoded: [ 2 37 64 52 42 64  1 38 64 44 64  3]
Decoded: বাংলাভাষা


In [83]:
import os

def load_annotations(annotation_file):
    """Loads annotations from a text file and converts class indices to Bangla text."""
    annotations = {}

    # Check if the file exists
    if not os.path.exists(annotation_file):
        print(f"❌ Error: Annotation file not found at {annotation_file}")
        return annotations  # Return empty dictionary

    with open(annotation_file, "r", encoding="utf-8") as f:
        for line in f:
            parts = line.strip().split(":")
            if len(parts) != 2:
                print(f"⚠️ Warning: Skipping malformed line: {line.strip()}")
                continue  # Skip corrupt lines

            image_name = parts[0].strip()  # Extract image filename

            try:
                # Convert index numbers to actual Bangla text
                label_indices = list(map(int, parts[1].strip().split()))
                label_text = "".join(class_mapping.get(idx, "") for idx in label_indices)
                annotations[image_name] = label_text  # Store result
            except ValueError:
                print(f"⚠️ Warning: Skipping line with invalid numbers: {line.strip()}")

    return annotations

# Load annotations from file
annotation_file_path = "/kaggle/input/promitilipi/imageannotationsid_train.txt"
annotations = load_annotations(annotation_file_path)


In [84]:
batch_size = 16

# Example Usage:
dataset = PromitoLipiDataset(
    img_dir="/kaggle/input/promitilipi/promitilipi/preprocessed_images",
    annotation_file=annotation_file_path,
    tokenizer=tokenizer,  # Pass the tokenizer
    transform=transforms.Compose([transforms.ToTensor()])
)

# Check some samples
for i in range(5):
    img, label = dataset[i]
    print(f"Sample {i+1}: {tokenizer.decode(label.numpy())}")


Sample 1: উপকরণ
Sample 2: উপবন
Sample 3: ঊহয
Sample 4: উদাস
Sample 5: উপজািত


In [85]:
from torch.utils.data import random_split

# Define dataset
# dataset = PromitoLipiDataset(
#     img_dir="/kaggle/input/promitilipi/PromitoLipi2/PromitoLipi2/WordImages(bmp)",
#     xml_dir="/kaggle/input/promitilipi/PromitoLipi2/PromitoLipi2/WordAnnotations(xml)",
#     tokenizer=tokenizer,
#     transform=transform
# )

# Define split sizes (80% Train, 10% Validation, 10% Test)
train_size = int(0.8 * len(dataset))
val_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size - val_size  # Ensures total remains same

# Randomly split dataset
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders
batch_size = 16

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Print dataset sizes
print(f"Training Set: {len(train_dataset)} samples")
print(f"Validation Set: {len(val_dataset)} samples")
print(f"Test Set: {len(test_dataset)} samples")


Training Set: 7864 samples
Validation Set: 983 samples
Test Set: 983 samples


In [86]:
# Define the class mapping








In [87]:
tokenizer.vocab_size

97

In [88]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = make_model(vocab_len=tokenizer.vocab_size).to(device)


In [89]:
class LabelSmoothing(nn.Module):
    "Implement label smoothing."
    def __init__(self, size, padding_idx=0, smoothing=0.0):
        super(LabelSmoothing, self).__init__()
        self.criterion = nn.KLDivLoss(size_average=False)
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.size = size
        self.true_dist = None
        
    def forward(self, x, target):
        assert x.size(1) == self.size
        true_dist = x.data.clone()
        true_dist.fill_(self.smoothing / (self.size - 2))
        true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
        true_dist[:, self.padding_idx] = 0
        mask = torch.nonzero(target.data == self.padding_idx)
        if mask.dim() > 0:
            true_dist.index_fill_(0, mask.squeeze(), 0.0)
        self.true_dist = true_dist
        return self.criterion(x, Variable(true_dist, requires_grad=False))


In [90]:
criterion = LabelSmoothing(size=tokenizer.vocab_size, padding_idx=0, smoothing=0.1).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.0004)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.95)


In [95]:
def train(model, criterion, optimiser, scheduler,dataloader):
 
    model.train()
    total_loss = 0
    for batch, (imgs, labels_y,) in enumerate(dataloader):
          imgs = imgs.to(device)
          labels_y = labels_y.to(device)
    
          optimiser.zero_grad()
          output = model(imgs.float(),labels_y.long()[:,:-1])
 
          norm = (labels_y != 0).sum()
          loss = criterion(output.log_softmax(-1).contiguous().view(-1, tokenizer.vocab_size), labels_y[:,1:].contiguous().view(-1).long()) / norm
 
          loss.backward()
          torch.nn.utils.clip_grad_norm_(model.parameters(), 0.2)
          optimizer.step()
          total_loss += loss.item() * norm
 
    return total_loss / len(dataloader)
 
from jiwer import cer

def evaluate(model, criterion, dataloader, tokenizer):
    model.eval()
    epoch_loss = 0
    total_cer = 0  # Track total CER
    total_samples = 0

    with torch.no_grad():
        for batch, (imgs, labels_y,) in enumerate(dataloader):
            imgs = imgs.to(device)
            labels_y = labels_y.to(device)

            # Model prediction
            output = model(imgs.float(), labels_y.long()[:, :-1])

            # Compute loss
            norm = (labels_y != 0).sum()
            loss = criterion(
                output.log_softmax(-1).contiguous().view(-1, tokenizer.vocab_size),
                labels_y[:, 1:].contiguous().view(-1).long()
            ) / norm

            epoch_loss += loss.item() * norm

            # Convert model output to text (Decoding)
            predicted_tokens = output.argmax(-1).cpu().numpy()  # Get best predictions
            predicted_texts = [tokenizer.decode(pred) for pred in predicted_tokens]  # Convert to text
            actual_texts = [tokenizer.decode(label.cpu().numpy()) for label in labels_y[:, 1:]]  # Ground truth text

            # Compute CER for this batch
            batch_cer = sum(cer(a, b) for a, b in zip(actual_texts, predicted_texts)) / len(actual_texts)
            total_cer += batch_cer
            total_samples += 1

    # Average CER across all validation samples
    avg_cer = total_cer / total_samples if total_samples > 0 else 0
    return epoch_loss / len(dataloader), avg_cer

     

In [94]:
!pip install jiwer


Collecting jiwer
  Downloading jiwer-3.1.0-py3-none-any.whl.metadata (2.6 kB)
Collecting click>=8.1.8 (from jiwer)
  Downloading click-8.1.8-py3-none-any.whl.metadata (2.3 kB)
Collecting rapidfuzz>=3.9.7 (from jiwer)
  Downloading rapidfuzz-3.12.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading jiwer-3.1.0-py3-none-any.whl (22 kB)
Downloading click-8.1.8-py3-none-any.whl (98 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.2/98.2 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading rapidfuzz-3.12.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m57.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hInstalling collected packages: rapidfuzz, click, jiwer
  Attempting uninstall: click
    Found existing installation: click 8.1.7
    Uninstalling click-8.1.7:
      Successfully uninstalled click-8.1.7
Successfully installe

In [None]:
#train model
 
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

target_path = "/kaggle/working/best_model.pth"  # ✅ Define the path


best_valid_loss = np.inf
c = 0
for epoch in range(200):
    print(f'Epoch: {epoch+1:02}, Learning Rate: {scheduler.get_last_lr()}')

    start_time = time.time()

    train_loss = train(model, criterion, optimizer, scheduler, train_loader)
    valid_loss, valid_cer = evaluate(model, criterion, val_loader, tokenizer)  # ✅ Now returns CER

    epoch_mins, epoch_secs = epoch_time(start_time, time.time())

    c += 1
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), target_path)
        c = 0

    if c > 4:
        scheduler.step()
        c = 0

    print(f'Time: {epoch_mins}m {epoch_secs}s')
    print(f'Train Loss: {train_loss:.3f}')
    print(f'Val   Loss: {valid_loss:.3f}')
    print(f'CER   Score: {valid_cer:.3f}')  # ✅ Display Character Error Rate

Epoch: 01, Learning Rate: [0.0001]
Time: 0m 58s
Train Loss: 104.137
Val   Loss: 97.942
CER   Score: 0.689
Epoch: 02, Learning Rate: [0.0001]
Time: 1m 12s
Train Loss: 100.143
Val   Loss: 92.267
CER   Score: 1.414
Epoch: 03, Learning Rate: [0.0001]
Time: 0m 57s
Train Loss: 95.871
Val   Loss: 87.957
CER   Score: 0.735
Epoch: 04, Learning Rate: [0.0001]
Time: 0m 58s
Train Loss: 91.931
Val   Loss: 86.311
CER   Score: 0.563
Epoch: 05, Learning Rate: [0.0001]
Time: 0m 57s
Train Loss: 88.519
Val   Loss: 84.072
CER   Score: 0.735
Epoch: 06, Learning Rate: [0.0001]
Time: 1m 12s
Train Loss: 85.431
Val   Loss: 81.932
CER   Score: 0.735
Epoch: 07, Learning Rate: [0.0001]
Time: 1m 0s
Train Loss: 82.582
Val   Loss: 79.282
CER   Score: 0.540
Epoch: 08, Learning Rate: [0.0001]
Time: 0m 57s
Train Loss: 79.191
Val   Loss: 78.191
CER   Score: 0.490
Epoch: 09, Learning Rate: [0.0001]
Time: 0m 58s
Train Loss: 77.045
Val   Loss: 77.129
CER   Score: 0.464
Epoch: 10, Learning Rate: [0.0001]
Time: 1m 3s
Train L