In [None]:
!pip install torcheval
!pip install torch_optimizer
!pip install torchmetrics
!pip install torchvision
!pip install lion-pytorch
!pip install timm



In [None]:
import torch
import cv2
import random
import numpy as np
import torchvision
import torchvision.models as models
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn



from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision import transforms
from tqdm import tqdm

from torch.optim.optimizer import Optimizer
from lion_pytorch import Lion
from torch.optim.lr_scheduler import LambdaLR, CosineAnnealingWarmRestarts, SequentialLR

from torch.cuda.amp import autocast, GradScaler
from torchmetrics import F1Score
from timm.data.mixup import Mixup

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")




In [None]:
class HISTEq:
    def __call__(self, img):
      img_np = np.array(img)
      if img_np.dtype != np.uint8:
         img_np = (img_np * 255).astype(np.uint8)


      if len(img_np.shape) == 2:
        equalized = cv2.equalizeHist(img_np)
      else:
         raise ValueError("Expected single-channel image")

      return transforms.functional.to_pil_image(equalized)




In [None]:
label_counts = {
    1: 38374, 7: 35754, 3: 35143, 0: 34585, 6: 34232, 2: 34203, 8: 33946, 9: 33847, 4: 33535, 5: 31416,
    24: 24983, 40: 24631, 28: 20764, 55: 18262, 47: 15318, 53: 14105, 30: 12602, 18: 11946, 49: 11418,
    39: 10177, 12: 10094, 36: 10033, 29: 9820, 15: 9182, 22: 9002, 43: 8738, 25: 8347, 23: 8237, 10: 6407,
    37: 5159, 21: 5076, 27: 5073, 14: 4934, 34: 4743, 32: 4695, 31: 4637, 13: 4562, 11: 3878, 19: 3762,
    42: 3687, 17: 3152, 52: 2994, 57: 2910, 38: 2854, 56: 2830, 59: 2822, 33: 2771, 50: 2749, 61: 2725,
    44: 2725, 35: 2701, 54: 2699, 58: 2697, 48: 2645, 26: 2605, 41: 2561, 16: 2517, 46: 2491, 20: 2468,
    51: 2448, 60: 2365, 45: 1896
}

def compute_class_weights(label_counts):
    total_sample = sum(label_counts.values())
    num_classes = len(label_counts)


    class_weights = {cls: total_sample / (num_classes * count) for cls, count in label_counts.items()}


    max_weight = max(class_weights.values())
    min_weight = min(class_weights.values())

    class_weights = {cls: 1 + ((weight - min_weight) / (max_weight - min_weight)) for cls, weight in class_weights.items()}

    sorted_classes = sorted(label_counts.keys())
    class_weights_list = [class_weights[cls] for cls in sorted_classes]

    return torch.FloatTensor(class_weights_list).to(device)


class_weights_tensor = compute_class_weights(label_counts)

print("Class Weights Tensor:", class_weights_tensor)


In [None]:


transform_for_Norm = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    HISTEq(),
    transforms.Resize((112, 112)),
    transforms.ToTensor()
])



dataset = datasets.EMNIST(
    root="data",
    split="byclass",
    train=True,
    download=True,
    transform=transform_for_Norm
)

loader = DataLoader(
    dataset,
    batch_size=256,
    num_workers=2,
    shuffle=False
)


mean_sum    = 0.0
sq_mean_sum = 0.0
num_pixels  = 0

for images, _ in loader:

    images = images.to(torch.float32)  # ensure float


    mean_sum    += images.sum(dim=[0, 2, 3])
    sq_mean_sum += (images ** 2).sum(dim=[0, 2, 3])

    num_pixels += images.size(0) * images.size(2) * images.size(3)


mean = mean_sum / num_pixels
var  = (sq_mean_sum / num_pixels) - (mean ** 2)
std  = torch.sqrt(var)

print("Mean:", mean)
print("Std :", std)




In [None]:


mean_value = mean
std_value  = std

train_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    HISTEq(),
    transforms.Resize((112, 112)),
    transforms.RandomHorizontalFlip(p=0.1),
    transforms.RandomRotation(degrees=10),
    transforms.RandomAffine(degrees=2, translate=(0.1, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean_value, std=std_value)
])

test_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    HISTEq(),
    transforms.Resize((112, 112)),

    transforms.ToTensor(),
    transforms.Normalize(mean=mean_value, std=std_value)
])








In [None]:
def data_loaders():


  train_dataset = datasets.EMNIST(root="data", split="byclass", train=True, transform=train_transform, download=True)
  test_dataset = datasets.EMNIST(root="data", split="byclass", train=False, transform= test_transform, download=True)


  train_loader = DataLoader(
        train_dataset,
        batch_size=512,
        shuffle=True,
        drop_last=True,
        num_workers=10,
        pin_memory=True,
        persistent_workers=True,
        prefetch_factor=6
    )
  test_loader = DataLoader(
        test_dataset,
        batch_size=512,
        shuffle=False,
        num_workers=10,
        pin_memory=True,
        persistent_workers=True,
        prefetch_factor=6
    )

  return train_loader,test_loader



In [None]:


class EMNISTModelB3(nn.Module):
    def __init__(self, num_classes=62):
        super().__init__()

        self.model = models.efficientnet_b3(pretrained=True)

        self.model.features[0][0] = nn.Conv2d(1, 40, kernel_size=3, stride=2, padding=1, bias=False)


        self.model.classifier = nn.Sequential(
            nn.Linear(1536, 768),
            nn.ReLU(),
            nn.Linear(768, num_classes)
        )

    def forward(self, x):
        return self.model(x)






In [None]:
def train_epoch(model, train_loader, criterion, optimizer, scheduler, scaler, mixup_fn, epoch):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    batch_iterator = tqdm(train_loader, desc=f"Epoch {epoch+1}/{CONFIG['epochs']}")
    optimizer.zero_grad()

    for i, (data, target) in enumerate(batch_iterator):
        data, target = data.to(device), target.to(device)
        data, target = mixup_fn(data, target)

        with autocast():
            outputs = model(data)

            log_probs = F.log_softmax(outputs, dim=1)
            loss = criterion(log_probs, target)

        # Gradient accumulation
        loss = loss / CONFIG['accumulation_steps']
        scaler.scale(loss).backward()

        if (i + 1) % CONFIG['accumulation_steps'] == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()



        total_loss += loss.item() * CONFIG['accumulation_steps']
        _, predicted = torch.max(outputs, 1)
        _, target_hard = torch.max(target, 1)
        correct += (predicted == target_hard).sum().item()
        total += target_hard.size(0)

        batch_iterator.set_postfix({
            "loss": loss.item() * CONFIG['accumulation_steps'],
            "acc": correct / total
        })

    return total_loss / len(train_loader), correct / total





In [None]:
def validate(model, test_loader, criterion, f1_metric):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)

            targets_one_hot = F.one_hot(targets, num_classes=62).type(torch.float32)
            loss = criterion(F.log_softmax(outputs, dim=1), targets_one_hot)

            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            f1_metric.update(predicted, targets)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)

    f1 = f1_metric.compute().item()
    f1_metric.reset()

    return total_loss / len(test_loader), correct / total, f1

In [None]:
CONFIG = {

    'epochs': 15,
    'accumulation_steps': 2

}

In [None]:
def main():
    model = EMNISTModelB3(num_classes=62).to(device)


    train_loader, test_loader = data_loaders()


    mixup_fn = Mixup(
        mixup_alpha=0.22,
        cutmix_alpha=0.62,
        prob=0.8,
        switch_prob=0.5,
        mode='batch',
        label_smoothing=0.1,
        num_classes= 62
    )

    warmup_epochs = 5

    f1_metric = F1Score(task="multiclass", num_classes= 62, average='weighted').to(device)
    optimizer = Lion(model.parameters(), lr=0.0003, weight_decay=0.01)

    warmup_lambda = lambda epoch: float(epoch + 1) / warmup_epochs
    warmup_scheduler = LambdaLR(optimizer, lr_lambda=warmup_lambda)

    cosine_scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=8, T_mult=1, eta_min=1e-6)

    scheduler = SequentialLR(optimizer, schedulers=[warmup_scheduler, cosine_scheduler], milestones=[warmup_epochs])
    criterion = nn.KLDivLoss(weight = class_weights_tensor,reduction='batchmean')

    scaler = GradScaler()



    # Training loop
    best_f1 = 0
    for epoch in range(CONFIG['epochs']):
        train_loss, train_acc = train_epoch(
            model, train_loader, criterion, optimizer,
            scheduler, scaler, mixup_fn, epoch
        )

        test_loss, test_acc, f1 = validate(model, test_loader, criterion, f1_metric)

        print(f"Epoch {epoch+1}/{CONFIG['epochs']} | "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}, "
              f"F1 Score: {f1:.4f}")


        scheduler.step()



if __name__ == "__main__":
    main()