In [1]:
import os
import random
import glob 
import pandas as pd
import string
import collections
import time

from tqdm import tqdm

from PIL import Image

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim

import matplotlib.pyplot as plt
import numpy as np

import models.lossnet as lossnet
import models.crnn as crnn
from config import *
from sampler import SubsetSequentialSampler

In [2]:
data = glob.glob(os.path.join('./Large_Captcha_Dataset', '*.png'))
path = './Large_Captcha_Dataset'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data.remove(os.path.join('./Large_Captcha_Dataset', '4q2wA.png'))

In [3]:
all_letters = string.ascii_uppercase + string.ascii_lowercase + string.digits

mapping = {}
mapping_inv = {}
i = 1
for x in all_letters:
    mapping[x] = i
    mapping_inv[i] = x
    i += 1

num_class = len(mapping)

In [4]:
images = []
labels = []
datas = collections.defaultdict(list)
for d in data:
    x = d.split('/')[-1]
    datas['image'].append(x)
    datas['label'].append([mapping[i] for i in x.split('.')[0]])
df = pd.DataFrame(datas)
# df.head()

In [5]:
class CaptchaDataset:
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        data = self.df.iloc[idx]
        image = Image.open(os.path.join(path, data['image'])).convert('L')
        label = torch.tensor(data['label'], dtype=torch.int32)
        
        if self.transform is not None:
            image = self.transform(image)
        
        return image, label

In [6]:
df_train, df_test = train_test_split(df, test_size=0.2, shuffle=True)

train_transform = T.Compose([
    T.ToTensor(),
    T.Normalize([0.4914,], [0.2023,])
])

test_transform = T.Compose([
    T.ToTensor(),
    T.Normalize([0.4914,], [0.2023,])
])

trainset = CaptchaDataset(df_train, train_transform)
unlabeledset = CaptchaDataset(df_train, test_transform)
testset = CaptchaDataset(df_test, test_transform)

In [7]:
# Loss Prediction Loss
def LossPredLoss(input, target, margin=1.0, reduction='mean'):
    assert len(input) % 2 == 0, 'the batch size is not even.'
    assert input.shape == input.flip(0).shape
    
    input = (input - input.flip(0))[:len(input)//2] # [l_1 - l_2B, l_2 - l_2B-1, ... , l_B - l_B+1], where batch_size = 2B
    target = (target - target.flip(0))[:len(target)//2]
    target = target.detach()

    one = 2 * torch.sign(torch.clamp(target, min=0)) - 1 # 1 operation which is defined by the authors
    
    if reduction == 'mean':
        loss = torch.sum(torch.clamp(margin - one * input, min=0))
        loss = loss / input.size(0) # Note that the size of input is already halved
    elif reduction == 'none':
        loss = torch.clamp(margin - one * input, min=0)
    else:
        NotImplementedError()
    
    return loss

In [8]:
def get_uncertainty(models, criterion, unlabeled_loader):
    models['backbone'].eval()
    models['module'].eval()
    uncertainty = torch.tensor([]).cuda()

    with torch.no_grad():
        for (inputs, labels) in unlabeled_loader:
            inputs = inputs.cuda()
            labels = labels.cuda()

            scores, features, _ = models['backbone'](inputs, labels, criterion)
            pred_loss = models['module'](features) # pred_loss = criterion(scores, labels) # ground truth loss
            pred_loss = pred_loss.view(pred_loss.size(0))

            uncertainty = torch.cat((uncertainty, pred_loss), 0)
    
    return uncertainty.cpu()

In [9]:
def predict(outputs):
    result = []
    for i in range(len(outputs)):
        pred = []
        then = 0
        for x in outputs[i]:
            if then != x and x > 0 :
                pred.append(x)
                if len(pred) == 5:
                    break
            then = x
        if len(pred) < 5:
            for i in range(5-len(pred)):
                pred.append(0)
        result.append(pred)
    result = torch.LongTensor(result).cuda()
    return result

In [10]:
# Train Utils
iters = 0

#
def train_epoch(models, criterion, optimizers, dataloaders, num_epochs, epoch, epoch_loss):
    models['backbone'].train()
    global iters


    pbar = tqdm(dataloaders['train'], leave=False, total=len(dataloaders['train']))
    for idx, data in enumerate(pbar):
        inputs = data[0].cuda()
        labels = data[1].cuda()
        iters += 1

        optimizers['backbone'].zero_grad()

        _, _, target_loss = models['backbone'](inputs, labels, criterion)

        pbar.set_description(f'({epoch}/{num_epochs}) target_loss : {target_loss.item():.3f}')

        target_loss.backward()
        optimizers['backbone'].step()

In [11]:
def train(models, criterion, optimizers, dataloaders, num_epochs, epoch_loss):
    print('>> Train a Model.')
    checkpoint_dir = os.path.join('./checkpoints', 'train', 'weights')
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)
    
    for epoch in range(num_epochs):
        train_epoch(models, criterion, optimizers, dataloaders, num_epochs, epoch, epoch_loss)

        # Save a checkpoint
        if False and epoch % 10 == 0:
            acc = test(models, criterion, dataloaders, mode='test')
            print(f'acc : {acc}')
            torch.save({
                'epoch': epoch + 1,
                'state_dict_backbone': models['backbone'].state_dict(),
            },
            f'{checkpoint_dir}/crnn_lloss_{int(time.time())}.pth')
    print('>> Finished.')

In [12]:
def test(models, criterion, dataloaders, mode='val'):
    print('>> Test a Model.')
    assert mode == 'val' or mode == 'test'
    models['backbone'].eval()

    total = 0
    correct = 0
    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(dataloaders[mode]):
            inputs = inputs.cuda()
            labels = labels.cuda()
            
            scores, _, _ = models['backbone'](inputs, labels, criterion)
            scores = scores.permute(1, 0, 2)
            _, preds = torch.max(scores.data, 2)
            preds = predict(preds)
            total += labels.size(0)
            for i in range(len(preds)):
                correct += torch.equal(preds[i], labels[i])

            if idx >= 999:
                break
    
    return 100 * correct / total

In [13]:
def main_task():   
    for trial in range(TRIALS):
        # Initialize a labeled dataset by randomly sampling K=ADDENDUM=1,000 data points from the entire dataset.
        indices = list(range(NUM_TRAIN))
        random.shuffle(indices)
        labeled_set = indices[:ADDENDUM]
        unlabeled_set = indices[ADDENDUM:]
        
        train_loader = DataLoader(trainset, batch_size=BATCH, 
                                    sampler=SubsetRandomSampler(labeled_set), 
                                    pin_memory=True)
        test_loader  = DataLoader(testset, batch_size=BATCH, shuffle=True)
        dataloaders  = {'train': train_loader, 'test': test_loader}
        
        # Model
        crnn_model    = crnn.CRNN(in_channels=1, output=num_class).cuda()
        loss_module = None 
        models      = {'backbone': crnn_model, 'module': loss_module}
        torch.backends.cudnn.benchmark = False

        # Active learning cycles
        for cycle in range(CYCLES):
            # Initialize Model weights Not Loss Prediction Module weights
            models['backbone'].reset_parameters()

            # Loss, criterion and scheduler (re)initialization
            criterion      = nn.CTCLoss(reduction='mean') # nn.CrossEntropyLoss(reduction='none')
            optim_backbone = optim.Adam(models['backbone'].parameters(), lr=LR,
                                    weight_decay=WDECAY)
            optim_module   = None 

            optimizers = {'backbone': optim_backbone, 'module': optim_module}

            # Training and test
            train(models, criterion, optimizers, dataloaders, EPOCH, EPOCHL)
            acc = test(models, criterion, dataloaders, mode='test')
            cur_time = time.strftime('%Y-%m-%d %I:%M:%S %p', time.localtime())
            print('{} : Trial {}/{} || Cycle {}/{} || Label set size {}: Test acc {}'.format(cur_time, trial+1, TRIALS, cycle+1, CYCLES, len(labeled_set), acc))
            with open('result.txt', 'a+') as f:
                f.write('{} : Trial {}/{} || Cycle {}/{} || Label set size {}: Test acc {}\n'.format(cur_time, trial+1, TRIALS, cycle+1, CYCLES, len(labeled_set), acc))


            # Update the labeled dataset via loss prediction-based uncertainty measurement

            # Randomly sample 10000 unlabeled data points
            random.shuffle(unlabeled_set)
            subset = unlabeled_set[:SUBSET]

            # Create unlabeled dataloader for the unlabeled subset
            unlabeled_loader = DataLoader(unlabeledset, batch_size=BATCH, 
                                            sampler=SubsetSequentialSampler(subset), # more convenient if we maintain the order of subset
                                            pin_memory=True)


            # Index in ascending order
            arg = np.arange(len(subset))
            random.shuffle(arg)
            
            # Update the labeled dataset and the unlabeled dataset, respectively
            labeled_set += list(torch.tensor(subset)[arg][-ADDENDUM:].numpy())
            unlabeled_set = list(torch.tensor(subset)[arg][:-ADDENDUM].numpy()) + unlabeled_set[SUBSET:]

            # Create a new dataloader for the updated labeled dataset
            dataloaders['train'] = DataLoader(trainset, batch_size=BATCH, 
                                                sampler=SubsetRandomSampler(labeled_set), 
                                                pin_memory=True)
        
        checkpoint_dir = os.path.join('./checkpoints', 'trial', 'weights')
        if not os.path.exists(checkpoint_dir):
            os.makedirs(checkpoint_dir)
        # Save a checkpoint
        torch.save({
                    'trial': trial + 1,
                    'state_dict_backbone': models['backbone'].state_dict(),
                },
                f'{checkpoint_dir}/crnn_lloss_{int(time.time())}.pth')

In [14]:
if __name__ == '__main__':
    main_task()

>> Train a Model.


                                                                             

KeyboardInterrupt: 