In [None]:
import os
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image

# Define the character to index mapping
amharic_characters = ' ሀሁሂሃሄህሆሇለሉሊላሌልሎሏሐሑሒሓሔሕሖሗመሙሚማሜምሞሟሠሡሢሣሤሥሦሧረሩሪራሬርሮሯሰሱሲሳሴስሶሷሸሹሺሻሼሽሾሿቀቁቂቃቄቅቆቇቐቑቒቓቔቕቖቘቚቛቜቝበቡቢባቤብቦቧቨቩቪቫቬቭቮቯተቱቲታቴትቶቷቸቹቺቻቼችቾቿኀኁኂኃኄኅኆኇነኑኒናኔንኖኗኘኙኚኛኜኝኞኟአኡኢኣኤእኦኧከኩኪካኬክኮኯኰኲኳኴኵኸኹኺኻኼኽኾዀዂዃዄዅወዉዊዋዌውዎዏዐዑዒዓዔዕዖዘዙዚዛዜዝዞዟዠዡዢዣዤዥዦዧየዩዪያዬይዮዯደዱዲዳዴድዶዷዸዹዺዻዼዽዾዿጀጁጂጃጄጅጆጇገጉጊጋጌግጎጏጐጒጓጔጕጘጙጚጛጜጝጞጟጠጡጢጣጤጥጦጧጨጩጪጫጬጭጮጯጰጱጲጳጴጵጶጷጸጹጺጻጼጽጾጿፀፁፂፃፄፅፆፇፈፉፊፋፌፍፎፏፐፑፒፓፔፕፖፗ፩፪፫፬፭፮፯፰፱፲፳፴፵፶፷፸፹፺፼፠፡።፣፥፦፧፨()[]'
char_to_idx = {char: idx + 1 for idx, char in enumerate(amharic_characters)}  # Start indexing from 1
char_to_idx['<PAD>'] = 0  # Padding index
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

# Define the dataset class
class OCRDataset(Dataset):
    def __init__(self, image_folder, label_folder, transform=None, max_label_length=75):
        self.image_folder = image_folder
        self.label_folder = label_folder
        self.transform = transform
        self.max_label_length = max_label_length
        self.image_files = sorted(os.listdir(image_folder))
        self.label_files = sorted(os.listdir(label_folder))

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.image_folder, self.image_files[idx])
        label_path = os.path.join(self.label_folder, self.label_files[idx])
        
        image = Image.open(img_path).convert('L')
        if self.transform:
            image = self.transform(image)
        
        with open(label_path, 'r') as f:
            label = f.read().strip()
        
        # Convert label to indices and pad with zeros
        label = [char_to_idx.get(char, 0) for char in label]
        if len(label) > self.max_label_length:
            label = label[:self.max_label_length]
        else:
            label = label + [0] * (self.max_label_length - len(label))
        
        return image, torch.tensor(label, dtype=torch.long)

# Define the model
class OCRModel(nn.Module):
    def __init__(self, num_classes, rnn_size=128):
        super(OCRModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 1))
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 1))
        self.conv5 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.batch_norm1 = nn.BatchNorm2d(512)
        self.conv6 = nn.Conv2d(512, 512, kernel_size=3, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(512)
        self.pool4 = nn.MaxPool2d(kernel_size=(2, 1))
        self.conv7 = nn.Conv2d(512, 512, kernel_size=2)
        self.blstm1 = nn.LSTM(512, rnn_size, bidirectional=True, batch_first=True)
        self.blstm2 = nn.LSTM(rnn_size*2, rnn_size, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(rnn_size*2, num_classes + 1)

    def forward(self, x):
        x = self.pool1(torch.relu(self.conv1(x)))
        x = self.pool2(torch.relu(self.conv2(x)))
        x = torch.relu(self.conv3(x))
        x = torch.relu(self.conv4(x))
        x = self.pool3(x)
        x = torch.relu(self.conv5(x))
        x = self.batch_norm1(x)
        x = torch.relu(self.conv6(x))
        x = self.batch_norm2(x)
        x = self.pool4(x)
        x = torch.relu(self.conv7(x))
        x = x.squeeze(2).permute(0, 2, 1)  # [batch, width, channels]
        x, _ = self.blstm1(x)
        x, _ = self.blstm2(x)
        x = self.fc(x)
        return x

# Define the CTC loss function
def ctc_loss(preds, labels, input_lengths, label_lengths):
    ctc_loss = nn.CTCLoss(blank=0, zero_infinity=True)
    preds = preds.log_softmax(2).permute(1, 0, 2)  # [T, N, C]
    loss = ctc_loss(preds, labels, input_lengths, label_lengths)
    return loss
def collate_fn(batch):
    images, labels = zip(*batch)
    images = torch.stack(images, dim=0)
    
    # Pad labels
    max_label_length = max(len(label) for label in labels)
    padded_labels = torch.zeros(len(labels), max_label_length, dtype=torch.long)
    for i, label in enumerate(labels):
        padded_labels[i, :len(label)] = label
    
    return images, padded_labels
# Decode predictions
def decode_predictions(preds, idx_to_char):
    preds = preds.argmax(2)  # Get the index of the max log-probability
    pred_strings = []
    for pred in preds:
        pred_string = ''.join([idx_to_char[idx.item()] for idx in pred if idx.item() != 0])
        pred_strings.append(pred_string)
    return pred_strings

transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((32, 128)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

train_dataset = OCRDataset('trains/images', 'trains/labels', transform=transform)
val_dataset = OCRDataset('vals/images', 'vals/labels', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=128,shuffle=False, collate_fn=collate_fn)

# Initialize the model, optimizer, and loss function
num_classes = len(amharic_characters)  # Adjust this based on your dataset
model = OCRModel(num_classes)
optimizer = optim.Adam(model.parameters(), lr=0.0001)
# Training and evaluation loop
num_epochs = 5
best_val_loss = float('inf')
best_model_path = 'model.pth'

for epoch in range(num_epochs):
    model.train()
    train_loss = 0

    for images, labels in train_loader:
        optimizer.zero_grad()
        preds = model(images)
        batch_size = preds.size(0)
        input_lengths = torch.full(size=(batch_size,), fill_value=preds.size(1), dtype=torch.long)
        label_lengths = torch.tensor([len(label[label != 0]) for label in labels], dtype=torch.long)

        loss = ctc_loss(preds, labels, input_lengths, label_lengths)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}')

    model.eval()
    val_loss = 0

    with torch.no_grad():
        for images, labels in val_loader:
            preds = model(images)
            batch_size = preds.size(0)
            input_lengths = torch.full(size=(batch_size,), fill_value=preds.size(1), dtype=torch.long)
            label_lengths = torch.tensor([len(label[label != 0]) for label in labels], dtype=torch.long)

            loss = ctc_loss(preds, labels, input_lengths, label_lengths)
            val_loss += loss.item()

    val_loss /= len(val_loader)
    print(f'Validation Loss: {val_loss:.4f}')

    # Save the best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), best_model)
        print(f'Saved best model with validation loss: {val_loss:.4f}')
