<a href="https://colab.research.google.com/github/pyrated03/Few-Shot-Learning-IIT-Bombay/blob/main/CUB_5_Shot_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# !pip install gevent --pre
# !pip install auto-py-to-exe
# !pip install -U setuptools
# !pip install git+https://github.com/orobix/Prototypical-Networks-for-Few-shot-Learning-PyTorch.git

In [None]:
# !cp --gdrive/My\ Drive/IITB\ Internship/src/Prototypical-Networks-for-Few-shot-Learning-PyTorch/src/prototypical_batch_sampler.py

In [None]:
import sys
sys.path.append('/content/gdrive/MyDrive/IITB Internship/src/Prototypical-Networks-for-Few-shot-Learning-PyTorch/src/')

In [None]:
from prototypical_batch_sampler import PrototypicalBatchSampler
from prototypical_loss import prototypical_loss as loss_fn
from omniglot_dataset import OmniglotDataset
from protonet import ProtoNet
from parser_util import get_parser
# import prototypical_batch_sampler

In [None]:
%%writefile resnet12.py
import torch.nn as nn
import torch
import torch.nn.functional as F


# This ResNet network was designed following the practice of the following papers:
# TADAM: Task dependent adaptive metric for improved few-shot learning (Oreshkin et al., in NIPS 2018) and
# A Simple Neural Attentive Meta-Learner (Mishra et al., in ICLR 2018).

def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, drop_rate=0.0, drop_block=False, block_size=1):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.LeakyReLU(0.1)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = conv3x3(planes, planes)
        self.bn3 = nn.BatchNorm2d(planes)
        self.maxpool = nn.MaxPool2d(stride)
        self.downsample = downsample
        self.stride = stride
        self.drop_rate = drop_rate
        self.num_batches_tracked = 0
        self.drop_block = drop_block
        self.block_size = block_size


    def forward(self, x):
        self.num_batches_tracked += 1

        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        out = self.maxpool(out)

        if self.drop_rate > 0:
            out = F.dropout(out, p=self.drop_rate, training=self.training, inplace=True)

        return out


class ResNet(nn.Module):

    def __init__(self, block=BasicBlock, keep_prob=1.0, avg_pool=False, drop_rate=0.0, dropblock_size=5):
        self.inplanes = 3
        super(ResNet, self).__init__()

        self.layer1 = self._make_layer(block, 64, stride=2, drop_rate=drop_rate)
        self.layer2 = self._make_layer(block, 160, stride=2, drop_rate=drop_rate)
        self.layer3 = self._make_layer(block, 320, stride=2, drop_rate=drop_rate, drop_block=True,
                                       block_size=dropblock_size)
        self.layer4 = self._make_layer(block, 640, stride=2, drop_rate=drop_rate, drop_block=True,
                                       block_size=dropblock_size)
        if avg_pool:
            self.avgpool = nn.AvgPool2d(5, stride=1)
        self.keep_prob = keep_prob
        self.keep_avg_pool = avg_pool
        self.dropout = nn.Dropout(p=1 - self.keep_prob, inplace=False)
        self.drop_rate = drop_rate

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, stride=1, drop_rate=0.0, drop_block=False, block_size=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion,
                          kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, drop_rate, drop_block, block_size))
        self.inplanes = planes * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.layer1(x)

        x = self.layer2(x)

        x = self.layer3(x)

        x = self.layer4(x)

        #return x
        return x.view(x.size(0),-1)

Writing resnet12.py


In [None]:
!mv -f resnet12.py ./gdrive/My\ Drive/IITB\ Internship/src

In [None]:
%%writefile cub_ds.py

import gc
import numpy as np
import pandas as pd
import cv2
import os
import time
import os.path as osp
import pickle
from PIL import Image

import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms



class CUB2002011_ds(Dataset):
    def __init__(self, args, mode="train"):
        assert mode in ["train", "test"] #Make sure the mode is one of the two
        super(CUB2002011_ds, self).__init__() #Inheritance dataset __init__()
        train_test_split = pd.read_csv(
            f"{args.data_dir}/train_test_split.txt",
            sep=" ",
            names=["img_id", "is_train"],
        )
        img_paths = pd.read_csv(
            f"{args.data_dir}/images.txt", sep=" ", names=["img_id", "img_path"]
        )  #Read all image paths
        labels = pd.read_csv(
            f"{args.data_dir}/image_class_labels.txt",
            sep=" ",
            names=["img_id", "img_label"],
        )
        self.data_dir = osp.join(args.data_dir, "images")
        self.mode = 1 if mode == "train" else 0  #Choose whether it is 1 training mode or test mode 0
        self.img_path = img_paths.loc[
            train_test_split["is_train"] == self.mode, "img_path"
        ].values  #Get the corresponding path in the corresponding mode
        self.label = labels.loc[
            train_test_split["is_train"] == self.mode, "img_label"
        ].values #Get the label in the corresponding mode
        img_sz = 84
        if mode == 'test':
            self.transform = transforms.Compose([
                transforms.Resize((img_sz, img_sz)),
                transforms.ToTensor()
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((img_sz, img_sz)),
                transforms.RandomHorizontalFlip(),
                transforms.RandomVerticalFlip(),
                transforms.RandomRotation(30),
                transforms.ToTensor()
            ])
        self.y = self.label

    def __len__(self):
        return self.label.shape[0] #The size of the entire data set in the corresponding mode

    def __getitem__(self, idx):
        img = self.transform(
            Image.open(f"{self.data_dir}/{self.img_path[idx]}").convert("RGB")
        ) #Open the path of the image in the corresponding folder of the data set and convert the image data to RGB
        label = self.label[idx]
        return img, label
    

Writing cub_ds.py


In [None]:
!mv -f cub_ds.py ./gdrive/My\ Drive/IITB\ Internship/src

In [None]:
%%writefile train.py 
# coding=utf-8
import sys
sys.path.append('/content/gdrive/MyDrive/IITB Internship/src/Prototypical-Networks-for-Few-shot-Learning-PyTorch/src/')

from prototypical_batch_sampler import PrototypicalBatchSampler
from prototypical_loss import prototypical_loss as loss_fn
from omniglot_dataset import OmniglotDataset
from cub_ds import *
from protonet import ProtoNet
from resnet12 import *
from parser_util import get_parser

from tqdm import tqdm
import numpy as np
import torch
import os


def init_seed(opt):
    '''
    Disable cudnn to maximize reproducibility
    '''
    torch.cuda.cudnn_enabled = False
    np.random.seed(opt.manual_seed)
    torch.manual_seed(opt.manual_seed)
    torch.cuda.manual_seed(opt.manual_seed)


def init_dataset(opt, mode):
    mode = 'test' if mode == 'test' else 'train'
    parser=argparse.ArgumentParser()
    parser.add_argument('-data_dir',default='')
    args=parser.parse_args([])
    
    dataset = CUB2002011_ds(args,mode)#OmniglotDataset(mode=mode, root=opt.dataset_root)
    n_classes = len(np.unique(dataset.y))
    if n_classes < opt.classes_per_it_tr or n_classes < opt.classes_per_it_val:
        raise(Exception('There are not enough classes in the dataset in order ' +
                        'to satisfy the chosen classes_per_it. Decrease the ' +
                        'classes_per_it_{tr/val} option and try again.'))
    return dataset


def init_sampler(opt, labels, mode):
    if 'train' in mode:
        classes_per_it = opt.classes_per_it_tr
        num_samples = opt.num_support_tr + opt.num_query_tr
    else:
        classes_per_it = opt.classes_per_it_val
        num_samples = opt.num_support_val + opt.num_query_val

    return PrototypicalBatchSampler(labels=labels,
                                    classes_per_it=classes_per_it,
                                    num_samples=num_samples,
                                    iterations=opt.iterations)


def init_dataloader(opt, mode):
    mode = 'test' if mode == 'test' else 'train'
    parser=argparse.ArgumentParser()
    parser.add_argument('-data_dir',default='gdrive/My Drive/IITB Internship/CUB_200_2011')
    args=parser.parse_args([])
    
    dataset = CUB2002011_ds(args,mode)
    #dataset = init_dataset(opt, mode)
    sampler = init_sampler(opt, dataset.y, mode)
    dataloader = torch.utils.data.DataLoader(dataset, batch_sampler=sampler)
    return dataloader


def init_protonet(opt, mode='res12'):
    '''
    Initialize the ProtoNet
    '''
    device = 'cuda:0' if torch.cuda.is_available() and opt.cuda else 'cpu'
    if mode == '4conv':
        model = ProtoNet().to(device)
    else:
        # resnet == resnet12
        model = ResNet().to(device)
    return model


def init_optim(opt, model):
    '''
    Initialize optimizer
    '''
    return torch.optim.Adam(params=model.parameters(),
                            lr=opt.learning_rate)


def init_lr_scheduler(opt, optim, mode='onecycle'):
    '''
    Initialize the learning rate scheduler
    '''
    if mode == 'step':
        return torch.optim.lr_scheduler.StepLR(optimizer=optim,
                                               gamma=opt.lr_scheduler_gamma,
                                               step_size=opt.lr_scheduler_step)
    else:
        return torch.optim.lr_scheduler.OneCycleLR(optim,
                                                   total_steps = opt.epochs,
                                                   max_lr = opt.learning_rate)


def save_list_to_file(path, thelist):
    with open(path, 'w') as f:
        for item in thelist:
            f.write("%s\n" % item)


def train(opt, tr_dataloader, model, optim, lr_scheduler, val_dataloader=None):
    '''
    Train the model with the prototypical learning algorithm
    '''

    device = 'cuda:0' if torch.cuda.is_available() and opt.cuda else 'cpu'

    if val_dataloader is None:
        best_state = None
    train_loss = []
    train_acc = []
    val_loss = []
    val_acc = []
    best_acc = 0

    best_model_path = os.path.join(opt.experiment_root, 'best_model.pth')
    last_model_path = os.path.join(opt.experiment_root, 'last_model.pth')

    for epoch in range(opt.epochs): #opt.epochs
        print('=== Epoch: {} ==='.format(epoch))
        tr_iter = iter(tr_dataloader)
        model.train()
        for batch in tqdm(tr_iter):
            optim.zero_grad()
            x, y = batch
            x, y = x.to(device), y.to(device)
            model_output = model(x)
            loss, acc = loss_fn(model_output, target=y,
                                n_support=opt.num_support_tr)
            loss.backward()
            optim.step()
            train_loss.append(loss.item())
            train_acc.append(acc.item())
        avg_loss = np.mean(train_loss[-opt.iterations:])
        avg_acc = np.mean(train_acc[-opt.iterations:])
        print('Avg Train Loss: {}, Avg Train Acc: {}'.format(avg_loss, avg_acc))
        lr_scheduler.step()
        '''if val_dataloader is None:
            continue
        val_iter = iter(val_dataloader)
        model.eval()
        for batch in val_iter:
            x, y = batch
            x, y = x.to(device), y.to(device)
            model_output = model(x)
            loss, acc = loss_fn(model_output, target=y,
                                n_support=opt.num_support_val)
            val_loss.append(loss.item())
            val_acc.append(acc.item())
        avg_loss = np.mean(val_loss[-opt.iterations:])
        avg_acc = np.mean(val_acc[-opt.iterations:])
        postfix = ' (Best)' if avg_acc >= best_acc else ' (Best: {})'.format(
            best_acc)
        print('Avg Val Loss: {}, Avg Val Acc: {}{}'.format(
            avg_loss, avg_acc, postfix))
        if avg_acc >= best_acc:
            torch.save(model.state_dict(), best_model_path)
            best_acc = avg_acc
            best_state = model.state_dict()'''

    torch.save(model.state_dict(), last_model_path)

    for name in ['train_loss', 'train_acc', 'val_loss', 'val_acc']:
        save_list_to_file(os.path.join(opt.experiment_root,
                                       name + '.txt'), locals()[name])

    #return best_state, 
    return best_acc, train_loss, train_acc, val_loss, val_acc


def test(opt, test_dataloader, model):
    '''
    Test the model trained with the prototypical learning algorithm
    '''
    device = 'cuda:0' if torch.cuda.is_available() and opt.cuda else 'cpu'
    avg_acc = list()
    model.eval()
    for epoch in range(10):
        test_iter = iter(test_dataloader)
        for batch in test_iter:
            x, y = batch
            x, y = x.to(device), y.to(device)
            with torch.no_grad():
                model_output = model(x)
            _, acc = loss_fn(model_output, target=y,
                             n_support=opt.num_support_val)
            avg_acc.append(acc.item())
    avg_acc = np.mean(avg_acc)
    print('Test Acc: {}'.format(avg_acc))

    return avg_acc


def eval(opt):
    '''
    Initialize everything and train
    '''
    options = get_parser().parse_args()

    if torch.cuda.is_available() and not options.cuda:
        print("WARNING: You have a CUDA device, so you should probably run with --cuda")

    #init_seed(options)
    test_dataloader = init_dataset(options)[-1]
    model = init_protonet(options)
    model_path = os.path.join(opt.experiment_root, 'best_model.pth')
    model.load_state_dict(torch.load(model_path))

    test(opt=options,
         test_dataloader=test_dataloader,
         model=model)


def main():
    '''
    Initialize everything and train
    '''
    options = get_parser().parse_args()
    
    options.epochs = 2
    options.iterations = 100
    
    options.classes_per_it_tr = 5
    options.num_support_tr = 5
    options.num_query_tr = 5
    
    options.classes_per_it_val = 5
    options.num_support_val = 5
    options.num_query_val = 5
    
    #options.learing_rate = 3e-04
    
    if not os.path.exists(options.experiment_root):
        os.makedirs(options.experiment_root)

    if torch.cuda.is_available() and not options.cuda:
        print("WARNING: You have a CUDA device, so you should probably run with --cuda")

    #init_seed(options)

    tr_dataloader = init_dataloader(options, 'train')
    val_dataloader = init_dataloader(options, 'val')
    # trainval_dataloader = init_dataloader(options, 'trainval')
    test_dataloader = init_dataloader(options, 'test')

    model = init_protonet(options, mode='res12')
    optim = init_optim(options, model)
    lr_scheduler = init_lr_scheduler(options, optim, mode='onecycle')
    res = train(opt=options,
                tr_dataloader=tr_dataloader,
                val_dataloader=val_dataloader,
                model=model,
                optim=optim,
                lr_scheduler=lr_scheduler)
    #best_state, 
    best_acc, train_loss, train_acc, val_loss, val_acc = res
    print('Testing with last model..')
    test(opt=options,
         test_dataloader=test_dataloader,
         model=model)

    '''model.load_state_dict(best_state)
    print('Testing with best model..')
    test(opt=options,
         test_dataloader=test_dataloader,
         model=model)'''

    # optim = init_optim(options, model)
    # lr_scheduler = init_lr_scheduler(options, optim)

    # print('Training on train+val set..')
    # train(opt=options,
    #       tr_dataloader=trainval_dataloader,
    #       val_dataloader=None,
    #       model=model,
    #       optim=optim,
    #       lr_scheduler=lr_scheduler)

    # print('Testing final model..')
    # test(opt=options,
    #      test_dataloader=test_dataloader,
    #      model=model)


if __name__ == '__main__':
    main()

Writing train.py


In [None]:
!mv -f train.py ./gdrive/My\ Drive/IITB\ Internship/src

In [None]:
ls


[0m[01;34mgdrive[0m/  [01;34msample_data[0m/


In [None]:
!python3 "./gdrive/My Drive/IITB Internship/src/train.py" --cuda

=== Epoch: 0 ===
  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
100% 100/100 [06:58<00:00,  4.19s/it]
Avg Train Loss: 462.3797738456726, Avg Train Acc: 0.37520000018179417
=== Epoch: 1 ===
100% 100/100 [03:18<00:00,  1.99s/it]
Avg Train Loss: 74.78570013046264, Avg Train Acc: 0.39519999966025354
Testing with last model..
Test Acc: 0.430159999370575
