In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt

In [2]:
"""class CaptchaDataset(Dataset):
    def __init__(self, image_paths, labels, img_width=200, img_height=50, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.img_width = img_width
        self.img_height = img_height
        self.transform = transform

        # Create character to index mapping
        # Start from index 1, leave 0 for CTC blank
        self.characters = sorted(list(set(char for label in labels for char in label)))
        print(self.characters)
        self.char_to_idx = {char: idx + 1 for idx, char in enumerate(self.characters)}
        self.idx_to_char = {idx + 1: char for idx, char in enumerate(self.characters)}
        # Add blank token
        self.idx_to_char[0] = ''

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('L')

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        label_indices = [self.char_to_idx[char] for char in label]

        return {
            'image': image,
            'label': torch.tensor(label_indices, dtype=torch.long),
            'label_length': torch.tensor(len(label_indices), dtype=torch.long),
            'text': label
        }

    def __len__(self):
        return len(self.image_paths)
"""
class CaptchaDataset(Dataset):
    def __init__(self, image_paths, labels, img_width=200, img_height=50, transform=None,
                 char_to_idx=None, idx_to_char=None):
        self.image_paths = image_paths
        # Filter out non-alphanumeric characters from labels
        self.labels = [''.join(char for char in label if char.isalnum()) for label in labels]
        self.img_width = img_width
        self.img_height = img_height
        self.transform = transform
        
        if char_to_idx is None or idx_to_char is None:
            # Only create mappings if not provided
            self.characters = sorted(list(set(char for label in self.labels for char in label)))
            self.char_to_idx = {char: idx + 1 for idx, char in enumerate(self.characters)}
            self.idx_to_char = {idx + 1: char for idx, char in enumerate(self.characters)}
            self.idx_to_char[0] = ''
        else:
            self.char_to_idx = char_to_idx
            self.idx_to_char = idx_to_char
            
    def __len__(self):
        return len(self.image_paths)
        
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('L')
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        label_indices = [self.char_to_idx[char] for char in label]
        return {
            'image': image,
            'label': torch.tensor(label_indices, dtype=torch.long),
            'label_length': torch.tensor(len(label_indices), dtype=torch.long),
            'text': label
        }

In [3]:
class OCRModel(nn.Module):
    def __init__(self, num_chars, img_width=200, img_height=50):
        super(OCRModel, self).__init__()

        # CNN layers with batch normalization
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )

        # Calculate features shape after CNN
        feature_h = img_height // 4
        feature_w = img_width // 4

        # Dense layer after CNN
        self.dense1 = nn.Sequential(
            nn.Linear(feature_h * 64, 128),
            nn.BatchNorm1d(50),  # Add batch norm here too
            nn.ReLU(),
            nn.Dropout(0.2)
        )

        # Bidirectional LSTM layers
        self.lstm1 = nn.LSTM(128, 128, bidirectional=True, batch_first=True, dropout=0.25)
        self.lstm2 = nn.LSTM(256, 64, bidirectional=True, batch_first=True, dropout=0.25)

        # Final dense layer
        self.dense2 = nn.Linear(128, num_chars + 1)  # +1 for CTC blank

        # Initialize weights
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        # CNN feature extraction
        x = self.cnn(x)
        batch_size, channels, height, width = x.size()

        # Reshape for dense layer
        x = x.permute(0, 3, 1, 2)
        x = x.contiguous().view(batch_size, width, height * channels)

        # Dense layer
        x = self.dense1(x)

        # LSTM layers
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)

        # Final dense layer
        x = self.dense2(x)
        x = nn.functional.log_softmax(x, dim=2)

        return x


In [4]:
def train_batch(model, batch, criterion, optimizer, device):
    model.train()
    optimizer.zero_grad()

    images = batch['image'].to(device)
    labels = batch['label'].to(device)
    label_lengths = batch['label_length'].to(device)

    # Forward pass
    log_probs = model(images)
    batch_size = images.size(0)
    input_lengths = torch.full(size=(batch_size,), fill_value=log_probs.size(1), dtype=torch.long).to(device)

    # Calculate loss
    loss = criterion(log_probs.transpose(0, 1), labels, input_lengths, label_lengths)

    # Backward pass
    loss.backward()
    optimizer.step()

    return loss.item()


In [5]:
def decode_predictions(log_probs, idx_to_char):
    """Decode CTC output to text"""
    pred_indices = torch.argmax(log_probs, dim=2)
    batch_texts = []

    for pred in pred_indices:
        text = []
        # Remove duplicate indices and CTC blank label
        for i in range(len(pred)):
            if i == 0 or pred[i] != pred[i-1]:
                if pred[i] != 0:  # 0 is assumed to be CTC blank
                    text.append(idx_to_char[pred[i].item()])
        batch_texts.append(''.join(text))

    return batch_texts


In [6]:
def train_model(model, train_loader, val_loader, num_epochs=100, device='cuda'):
    criterion = nn.CTCLoss(zero_infinity=True, blank=0)  # Set blank index to 0
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

    model = model.to(device)
    best_val_loss = float('inf')
    patience = early_stopping_patience = 10
    patience_counter = 0

    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0
        for batch in train_loader:
            optimizer.zero_grad()

            images = batch['image'].to(device)
            labels = batch['label'].to(device)
            label_lengths = batch['label_length'].to(device)

            log_probs = model(images)
            batch_size = images.size(0)
            input_lengths = torch.full(size=(batch_size,), fill_value=log_probs.size(1), dtype=torch.long).to(device)

            loss = criterion(log_probs.transpose(0, 1), labels, input_lengths, label_lengths)
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)

            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                images = batch['image'].to(device)
                labels = batch['label'].to(device)
                label_lengths = batch['label_length'].to(device)

                log_probs = model(images)
                batch_size = images.size(0)
                input_lengths = torch.full(size=(batch_size,), fill_value=log_probs.size(1), dtype=torch.long).to(device)

                loss = criterion(log_probs.transpose(0, 1), labels, input_lengths, label_lengths)
                val_loss += loss.item()

        val_loss /= len(val_loader)

        # Learning rate scheduling
        scheduler.step(val_loss)

        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Early stopping triggered after {epoch+1} epochs')
                break


!curl -LO https://github.com/AakashKumarNain/CaptchaCracker/raw/master/captcha_images_v2.zip
!unzip -qq captcha_images_v2.zip

import kagglehub

# Download latest version
path = kagglehub.dataset_download("resppony/6-digit-alphanumeric-captcha-dataset")

print("Path to dataset files:", path)

In [7]:
def custom_collate_fn(batch):
    """
    Custom collate function to handle variable length sequences
    Args:
        batch: List of dictionaries containing 'image', 'label', 'label_length', and 'text'
    """
    # Sort batch by label length in descending order
    batch.sort(key=lambda x: len(x['label']), reverse=True)
    
    # Get maximum sequence length in this batch
    max_length = len(batch[0]['label'])
    
    # Prepare lists for batch items
    images = []
    labels = []
    label_lengths = []
    texts = []
    
    for item in batch:
        images.append(item['image'])
        # Pad labels to max_length
        curr_label = item['label']
        curr_len = len(curr_label)
        if curr_len < max_length:
            # Pad with zeros (blank token)
            padding = torch.zeros(max_length - curr_len, dtype=torch.long)
            curr_label = torch.cat([curr_label, padding])
        labels.append(curr_label)
        label_lengths.append(item['label_length'])
        texts.append(item['text'])
    
    # Stack all tensors
    images = torch.stack(images)
    labels = torch.stack(labels)
    label_lengths = torch.stack(label_lengths)
    
    return {
        'image': images,
        'label': labels,
        'label_length': label_lengths,
        'text': texts
    }

def create_datasets(images, labels, batch_size=16, train_size=0.9, transform=None):
    """
    Create datasets with custom collate function
    """
    # Split data
    indices = torch.randperm(len(images))
    train_size = int(len(images) * train_size)
    train_indices = indices[:train_size]
    val_indices = indices[train_size:]

    # If no transform provided, create default transform
    if transform is None:
        transform = transforms.Compose([
            transforms.Resize((50, 200)),
            transforms.ToTensor(),
        ])

    # Create datasets
    train_dataset = CaptchaDataset(
        [images[i] for i in train_indices],
        [labels[i] for i in train_indices],
        transform=transform,
        char_to_idx=char_to_idx,
        idx_to_char=idx_to_char
    )
    
    val_dataset = CaptchaDataset(
        [images[i] for i in val_indices],
        [labels[i] for i in val_indices],
        transform=transform,
        char_to_idx=char_to_idx,
        idx_to_char=idx_to_char
    )

    # Create dataloaders with custom collate function
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True,
        collate_fn=custom_collate_fn
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False,
        collate_fn=custom_collate_fn
    )

    return train_loader, val_loader, train_dataset.char_to_idx, train_dataset.idx_to_char

In [8]:
# Define expanded character set (A-Z, a-z, 0-9)
characters = (
    [str(i) for i in range(10)] +  # 0-9
    [chr(i) for i in range(65, 91)] +  # A-Z
    [chr(i) for i in range(97, 123)]   # a-z
)
characters = sorted(characters)  # Sort to ensure consistent ordering

# Create character mappings
char_to_idx = {char: idx + 1 for idx, char in enumerate(characters)}
idx_to_char = {idx + 1: char for idx, char in enumerate(characters)}
# Add blank token
idx_to_char[0] = ''


In [9]:
# def combine_datasets(old_data_dir, new_data_dir, batch_size=16, train_size=0.9):
#     # Load original dataset paths and labels
#     old_images = sorted(list(map(str, list(Path(old_data_dir).glob("*.png")))))
#     old_labels = [img.split(os.path.sep)[-1].split(".png")[0] for img in old_images]

#     # Load new dataset paths and labels
#     new_images = sorted(list(map(str, list(Path(new_data_dir).glob("*.png")))))
#     new_labels = [img.split(os.path.sep)[-1].split(".png")[0] for img in new_images]

#     # print(f"Number of images found in new_data_dir: {len(new_images)}")
#     # print(f"New data directory path: {new_data_dir}")
    
#     # Combine datasets
#     #all_images = old_images + new_images
#     #all_labels = old_labels + new_labels

#     # Create datasets with combined data
#     transform = transforms.Compose([
#         transforms.Resize((50, 200)),
#         transforms.ToTensor(),
#     ])

#     return create_datasets(new_images, new_labels, batch_size, train_size, transform)

# jpg png

In [10]:
def combine_datasets(old_data_dir, new_data_dir, batch_size=16, train_size=0.9):
    # Load old dataset paths and labels (both .png and .jpg)
    old_images_png = list(Path(old_data_dir).glob("*.png"))
    old_images_jpg = list(Path(old_data_dir).glob("*.jpg"))
    old_images = sorted(list(map(str, old_images_png + old_images_jpg)))
    old_labels = [img.split(os.path.sep)[-1].split(".")[0] for img in old_images]

    # Load new dataset paths and labels (both .png and .jpg)
    new_images_png = list(Path(new_data_dir).glob("*.png"))
    new_images_jpg = list(Path(new_data_dir).glob("*.jpg"))
    new_images = sorted(list(map(str, new_images_png + new_images_jpg)))
    new_labels = [img.split(os.path.sep)[-1].split(".")[0] for img in new_images]

    # Combine datasets
    all_images = old_images + new_images
    all_labels = old_labels + new_labels

    # Transformation
    transform = transforms.Compose([
        transforms.Resize((50, 200)),
        transforms.ToTensor(),
    ])

    return create_datasets(all_images, all_labels, batch_size, train_size, transform)


In [11]:
"""def update_model_for_new_chars(model_path, new_num_chars):
    model = OCRModel(num_chars=len(char_to_idx))
    model.load_state_dict(torch.load(model_path))

    # Create new final layer with expanded character set
    old_dense2 = model.dense2
    model.dense2 = nn.Linear(128, new_num_chars + 1)  # +1 for CTC blank

    # Copy over existing weights for previously known characters
    with torch.no_grad():
        model.dense2.weight[:old_dense2.weight.shape[0]] = old_dense2.weight
        model.dense2.bias[:old_dense2.bias.shape[0]] = old_dense2.bias

    return model
    """
def update_model_for_new_chars(model_path, char_to_idx):
    # First, load the old model with original number of characters
    num_char = len(char_to_idx) + 1
    print(num_char)
    old_model = OCRModel(num_chars=63)  # Original 19 chars (the +1 for blank is handled in the model)
    old_model.load_state_dict(torch.load(model_path))

    # Create new model with expanded character set
    new_model = OCRModel(num_chars=num_char)  # 62 chars (the +1 for blank is handled in the model)

    # Copy all layers except the last dense layer
    new_model.cnn.load_state_dict(old_model.cnn.state_dict())
    new_model.dense1.load_state_dict(old_model.dense1.state_dict())
    new_model.lstm1.load_state_dict(old_model.lstm1.state_dict())
    new_model.lstm2.load_state_dict(old_model.lstm2.state_dict())

    # Copy weights for known characters in the final layer
    with torch.no_grad():
        new_model.dense2.weight[:64, :] = old_model.dense2.weight  # Copy old weights (including blank)
        new_model.dense2.bias[:64] = old_model.dense2.bias

        # Initialize the weights for new characters with small random values
        nn.init.kaiming_normal_(new_model.dense2.weight[64:, :], mode='fan_out', nonlinearity='relu')
        nn.init.constant_(new_model.dense2.bias[64:], 0)

    return new_model

In [15]:

# # Set device
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# print(f"Using device: {device}")

# # Load and combine datasets
# train_loader, val_loader, char_to_idx, idx_to_char = combine_datasets(
#     old_data_dir="./captcha_images_v2/",
#     new_data_dir="./datasets/combined_labeled/",
#     batch_size=16
# )

# print(type(char_to_idx))
# # Update model with new character set
# model = update_model_for_new_chars('best_model.pth', char_to_idx)  # +1 for blank token
# model = model.to(device)
# # Continue training/content/best_model.pth
# train_model(
#     model=model,
#     train_loader=train_loader,
#     val_loader=val_loader,
#     num_epochs=30,  # Fewer epochs since model is pre-trained
#     device=device
# )




# Load and combine datasets
train_loader, val_loader, char_to_idx, idx_to_char = combine_datasets(
    old_data_dir="./captcha_images_v2/",
    new_data_dir="./datasets/combined_labeled/",
    batch_size=16
)

# Update model with new character set and train
model = update_model_for_new_chars('best_model.pth', char_to_idx)
model = model.to("cuda")
train_model(model, train_loader, val_loader, num_epochs=30, device="cuda")

63
Epoch 1/30:
Train Loss: 1.4144, Val Loss: 0.7780
Epoch 2/30:
Train Loss: 0.6597, Val Loss: 0.5203
Epoch 3/30:
Train Loss: 0.4751, Val Loss: 0.4015
Epoch 4/30:
Train Loss: 0.3626, Val Loss: 0.3355
Epoch 5/30:
Train Loss: 0.2917, Val Loss: 0.2830
Epoch 6/30:
Train Loss: 0.2437, Val Loss: 0.2516
Epoch 7/30:
Train Loss: 0.2080, Val Loss: 0.2271
Epoch 8/30:
Train Loss: 0.1774, Val Loss: 0.2105
Epoch 9/30:
Train Loss: 0.1546, Val Loss: 0.2021
Epoch 10/30:
Train Loss: 0.1307, Val Loss: 0.1812
Epoch 11/30:
Train Loss: 0.1185, Val Loss: 0.1896
Epoch 12/30:
Train Loss: 0.1057, Val Loss: 0.1647
Epoch 13/30:
Train Loss: 0.0932, Val Loss: 0.1725
Epoch 14/30:
Train Loss: 0.0777, Val Loss: 0.1742
Epoch 15/30:
Train Loss: 0.0720, Val Loss: 0.1644
Epoch 16/30:
Train Loss: 0.0671, Val Loss: 0.1695
Epoch 17/30:
Train Loss: 0.0611, Val Loss: 0.1536
Epoch 18/30:
Train Loss: 0.0533, Val Loss: 0.1361
Epoch 19/30:
Train Loss: 0.0457, Val Loss: 0.1428
Epoch 20/30:
Train Loss: 0.0442, Val Loss: 0.1405
Epoch 

# After training, single image inference




In [13]:
def decode_batch_predictions(model, images, idx_to_char, device='cuda'):
    """Predict and decode the images to text"""
    model.eval()
    with torch.no_grad():
        images = images.to(device)
        log_probs = model(images)

    pred_texts = decode_predictions(log_probs, idx_to_char)
    return pred_texts

In [18]:
def predict_single_image(image_path, model, transform, idx_to_char, device='cuda'):
    # Load and preprocess the image
    image = Image.open(image_path).convert('L')
    image = transform(image).unsqueeze(0)  # Add batch dimension

    # Get prediction
    pred_text = decode_batch_predictions(model, image, idx_to_char, device)[0]
    return pred_text

# Example usage for single image prediction:

transform = transforms.Compose([
    transforms.Resize((50, 200)),
    transforms.ToTensor(),
])

image_path = "./dataset2/validation/validation/008549.babde59a74748445130ddd0732a44dff.jpg" # Replace with your image path
prediction = predict_single_image(image_path, model, transform, idx_to_char, device)
print(f"Predicted text: {prediction}")

Predicted text: 008549


# Manual Saving

In [16]:
torch.save(model, 'full_captcha_model.pth')

# Or save just the model state dictionary (recommended)
torch.save(model.state_dict(), 'captcha_model_state.pth')

torch.save(model.state_dict(), 'best_model.pth')