<a href="https://colab.research.google.com/github/mjairamchandr21/Low-Resolution-License-Plate-Recognition/blob/main/Computer_Vision_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import cv2
import json
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from tqdm.auto import tqdm
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Vocabulary for Brazilian and Mercosur plates
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
vocab = ["-"] + list(chars)  # Index 0 is CTC Blank
char_to_idx = {char: idx for idx, char in enumerate(vocab)}
idx_to_char = {idx: char for idx, char in enumerate(vocab)}
num_classes = len(vocab)

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
class LPRTrackDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.samples = []
        for scenario in ["Scenario-A", "Scenario-B"]:
            scenario_path = os.path.join(root_dir, scenario)
            if not os.path.exists(scenario_path): continue
            for layout in ["Mercosur", "Brazilian"]:
                layout_path = os.path.join(scenario_path, layout)
                if not os.path.exists(layout_path): continue
                for track in os.listdir(layout_path):
                    track_path = os.path.join(layout_path, track)
                    if os.path.isdir(track_path):
                        self.samples.append({"path": track_path, "scenario": scenario, "layout": layout})
        print(f"Total tracks found: {len(self.samples)}")

    def __len__(self): return len(self.samples)

    def load_images(self, folder, keyword, size):
        imgs = []
        files = sorted([f for f in os.listdir(folder) if keyword in f.lower()])[:5]
        for f in files:
            img = cv2.imread(os.path.join(folder, f))
            if img is None: continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, size)
            imgs.append(img)
        return np.array(imgs)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        p = sample["path"]

        # Load 5 LR and 5 HR images
        lr = self.load_images(p, "lr", (128, 64))
        hr = self.load_images(p, "hr", (256, 128))

        # Load Text
        text = ""
        ann_path = os.path.join(p, "annotation.json")
        if os.path.exists(ann_path):
            with open(ann_path) as f:
                text = json.load(f).get("plate_text", "").upper()

        # Convert to Tensors
        lr_t = torch.from_numpy(lr).permute(0, 3, 1, 2).float() / 255.0
        hr_t = torch.from_numpy(hr).permute(0, 3, 1, 2).float() / 255.0

        return lr_t, hr_t, text

In [5]:
class MultiFrameSR(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(15, 64, 3, padding=1) # 5 frames * 3 channels
        self.conv2 = nn.Conv2d(64, 64, 3, padding=1)
        self.up = nn.Sequential(
            nn.Conv2d(64, 256, 3, padding=1),
            nn.PixelShuffle(2),
            nn.ReLU()
        )
        self.out = nn.Conv2d(64, 3, 3, padding=1)

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.reshape(B, T*C, H, W)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.up(x)
        return torch.sigmoid(self.out(x))

class OCRModel(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, None)) # Collapse Height to 1
        )
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=256, nhead=8), num_layers=3)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.cnn(x).squeeze(2) # [B, 256, W]
        x = x.permute(2, 0, 1)      # [W, B, 256]
        x = self.transformer(x)
        return self.fc(x)

In [6]:
# Initialization
root = "/content/drive/MyDrive/train"
save_path = "/content/drive/MyDrive/LR_LPR_SR_Models"
os.makedirs(save_path, exist_ok=True)

full_ds = LPRTrackDataset(root)
train_ds, val_ds = random_split(full_ds, [int(len(full_ds)*0.9), len(full_ds)-int(len(full_ds)*0.9)])

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=2) # 0 is safer for Drive
val_loader = DataLoader(val_ds, batch_size=16, shuffle=False, num_workers=2)

sr_net = MultiFrameSR().to(device)
ocr_net = OCRModel(num_classes).to(device)

optimizer = torch.optim.Adam(list(sr_net.parameters()) + list(ocr_net.parameters()), lr=1e-4)
criterion_sr = nn.MSELoss()
criterion_ocr = nn.CTCLoss(blank=0)

best_loss = float('inf')



Total tracks found: 20000


  self.transformer = nn.TransformerEncoder(


In [7]:
def calculate_accuracy(logits, targets_text):
    """
    logits: [W, B, NumClasses] from the OCR model
    targets_text: List of actual strings ["ABC1234", ...]
    """
    probs = F.softmax(logits, dim=2)
    best_paths = torch.argmax(probs, dim=2).transpose(0, 1).cpu().numpy() # [B, W]

    correct = 0
    for i, path in enumerate(best_paths):
        # CTC Decoding
        decoded_str = ""
        prev_char = 0
        for char_idx in path:
            if char_idx != 0 and char_idx != prev_char:
                decoded_str += idx_to_char[char_idx]
            prev_char = char_idx

        if decoded_str == targets_text[i]:
            correct += 1

    return (correct / len(targets_text)) * 100

In [None]:
num_epochs=1
for epoch in range(num_epochs):
    # --- TRAINING PHASE ---
    sr_net.train(); ocr_net.train()
    train_loss, train_acc = 0, 0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]")

    for lr_imgs, hr_imgs, texts in pbar:
        lr_imgs, hr_target = lr_imgs.to(device), hr_imgs[:, 0].to(device)

        # Forward
        sr_out = sr_net(lr_imgs)
        logits = ocr_net(sr_out)

        # Loss Calculation
        loss_sr = criterion_sr(sr_out, hr_target)
        targets = torch.cat([torch.tensor([char_to_idx[c] for c in t]) for t in texts]).to(device)
        t_lens = torch.tensor([len(t) for t in texts], dtype=torch.long)
        i_lens = torch.full((logits.size(1),), logits.size(0), dtype=torch.long)
        loss_ocr = criterion_ocr(logits.log_softmax(2), targets, i_lens, t_lens)

        total_loss = loss_sr + (0.1 * loss_ocr)

        # Optimize
        optimizer.zero_grad(); total_loss.backward(); optimizer.step()

        # Metrics
        train_loss += total_loss.item()
        batch_acc = calculate_accuracy(logits, texts)
        train_acc += batch_acc

        pbar.set_postfix({
            "Loss": f"{(train_loss/(pbar.n+1)):.4f}",
            "Acc": f"{(train_acc/(pbar.n+1)):.2f}%"
        })

    # --- VALIDATION PHASE ---
    sr_net.eval(); ocr_net.eval()
    val_loss, val_acc = 0, 0
    vbar = tqdm(val_loader, desc=f"Epoch {epoch+1} [Val]")

    with torch.no_grad():
        for lr, hr, texts in vbar:
            lr, hr = lr.to(device), hr[:,0].to(device)
            sr_out = sr_net(lr)
            logits = ocr_net(sr_out)

            val_loss += criterion_sr(sr_out, hr).item()
            val_acc += calculate_accuracy(logits, texts)

            vbar.set_postfix({
                "Loss": f"{(val_loss/(vbar.n+1)):.4f}",
                "Acc": f"{(val_acc/(vbar.n+1)):.2f}%"
            })

    # --- SAVE LOGIC ---
    avg_val_acc = val_acc / len(val_loader)
    if avg_val_acc > best_acc: # Better to save based on Accuracy for competition
        best_acc = avg_val_acc
        torch.save(sr_net.state_dict(), f"{save_path}/best_sr.pth")
        torch.save(ocr_net.state_dict(), f"{save_path}/best_ocr.pth")
        print(f"⭐ New Best Accuracy: {best_acc:.2f}%")

Epoch 1/1 [Train]:   0%|          | 0/1125 [00:00<?, ?it/s]

In [None]:
def generate_submission(test_path, sr_model, ocr_model):
    sr_model.eval(); ocr_model.eval()
    results = []
    tracks = sorted([t for t in os.listdir(test_path) if os.path.isdir(os.path.join(test_path, t))])

    with torch.no_grad():
        for tid in tqdm(tracks, desc="Final Inference"):
            # Load images
            imgs = []
            tp = os.path.join(test_path, tid)
            for f in sorted([f for f in os.listdir(tp) if "lr" in f.lower()])[:5]:
                img = cv2.resize(cv2.cvtColor(cv2.imread(os.path.join(tp, f)), cv2.COLOR_BGR2RGB), (128, 64))
                imgs.append(img)

            x = torch.from_numpy(np.array(imgs)).permute(0, 3, 1, 2).float().unsqueeze(0).to(device) / 255.0
            sr_img = sr_model(x)
            logits = ocr_model(sr_img)
            probs = F.softmax(logits, dim=2)

            # Decode & Confidence
            best_p = torch.argmax(probs, dim=2).squeeze().cpu().numpy()
            conf_p = torch.max(probs, dim=2)[0].squeeze().cpu().numpy()

            res_txt = ""; confs = []
            prev = 0
            for i, c in enumerate(best_p):
                if c != 0 and c != prev:
                    res_txt += idx_to_char[c]
                    confs.append(conf_p[i])
                prev = c

            score = np.mean(confs) if confs else 0.0
            results.append(f"{tid},{res_txt};{score:.4f}")

    with open("submission.txt", "w") as f:
        f.write("\n".join(results))
    print("Submission File Created!")

# Usage:
generate_submission("/content/drive/MyDrive/Pa7a3Hin-test-public", sr_net, ocr_net)

In [None]:
!zip submission.zip submission.txt