# Comparison of MAML, CNP, and MAN

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd drive/MyDrive/'Colab Notebooks/MetaLearning'
!ls

[Errno 2] No such file or directory: 'drive/MyDrive/Colab Notebooks/MetaLearning'
/content/drive/MyDrive/Colab Notebooks/MetaLearning
HW2.ipynb	models.ipynb  nb2-CNP.ipynb  utils.ipynb
l2lutils.ipynb	nb1.ipynb     nb3.ipynb


In [None]:
!pip install import_ipynb --quiet
!pip install learn2learn --quiet

In [None]:
import import_ipynb
import utils
import models
utils.hide_toggle('Imports 1')

In [None]:
from IPython import display
import torch
import torch.nn as nn
from sklearn.manifold import TSNE
from matplotlib import pyplot as plt
# from l2lutils import KShotLoader
from IPython import display
utils.hide_toggle('Imports 2')

l2lutils

In [None]:
import learn2learn as l2l
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np
import learn2learn as l2l
from learn2learn.data import *
import import_ipynb
import utils
class KShotLoader():
    def __init__(self,myds,num_tasks=1000,shots=2,ways=2,classes=None):
        self.shots = shots
        self.ways = ways
        self.myMds = l2l.data.MetaDataset(myds)
        if classes == None:
            n_classes = len(set(myds.labels))
            classes = [i for i in range(n_classes)]
        self.my_tasks = l2l.data.TaskDataset(self.myMds, task_transforms=[
                                l2l.data.transforms.FilterLabels(self.myMds,classes),
                                l2l.data.transforms.NWays(self.myMds,ways),
                                l2l.data.transforms.KShots(self.myMds,2*shots),
                                l2l.data.transforms.LoadData(self.myMds),
                                l2l.data.transforms.RemapLabels(self.myMds),
                                l2l.data.transforms.ConsecutiveLabels(self.myMds)
                                ],num_tasks=num_tasks)
    def get_task(self):
        data,labels = self.my_tasks.sample()
        adaptation_indices = np.zeros(data.size(0), dtype=bool)
        adaptation_indices[np.arange(self.shots*self.ways) * 2] = True
        evaluation_indices = torch.from_numpy(~adaptation_indices)
        adaptation_indices = torch.from_numpy(adaptation_indices)
        adaptation_data, adaptation_labels = data[adaptation_indices], labels[adaptation_indices]
        evaluation_data, evaluation_labels = data[evaluation_indices], labels[evaluation_indices]
        d_train = (adaptation_data,adaptation_labels)
        d_test = (evaluation_data,evaluation_labels)
        return d_train, d_test

# Meta Training

In [None]:
#Generate data - euclidean
meta_train_ds, meta_test_ds, full_loader = utils.euclideanDataset(n_samples=10000,n_features=20,n_classes=10,batch_size=32)
import learn2learn as l2l
import torch.optim as optim
classes_train = [i for i in range(5)]
classes_test = [i+5 for i in range(5)]
classes_train, classes_test

([0, 1, 2, 3, 4], [5, 6, 7, 8, 9])

In [None]:
shots,ways = 2,3
n_epochs=50
meta_train_task_count=50
meta_train_kloader=KShotLoader(meta_train_ds,shots=shots,ways=ways,num_tasks=1000) #,classes=classes_train
meta_train_tasks = []
for i in range(meta_train_task_count):
  meta_train_tasks.append(meta_train_kloader.get_task())

# Meta Testing

In [None]:
meta_test_task_count = 25
meta_test_kloader=KShotLoader(meta_test_ds,shots=shots,ways=ways) #,classes=classes_test
meta_test_tasks = []
for i in range(meta_test_task_count):
  meta_test_tasks.append(meta_test_kloader.get_task())

# MAML

In [None]:
fas = 5
net = models.MLP(dims=[20,64,32,ways])
maml = l2l.algorithms.MAML(net, lr=1e-2)
optimizer = optim.Adam(maml.parameters(),lr=5e-3)
lossfn = torch.nn.NLLLoss()

In [None]:
import time
s=time.time()
epoch=0
n_epochs=50
while epoch<n_epochs:
    adapt_loss = 0.0
    test_acc = 0.0
    # Sample and train on a task
    for task in meta_train_tasks:
        d_train,d_test=task
        learner = maml.clone()
        for fas_step in range(fas):
            train_preds = learner(d_train[0])
            train_loss = lossfn(train_preds,d_train[1])
            learner.adapt(train_loss)
        test_preds = learner(d_test[0])
        adapt_loss += lossfn(test_preds,d_test[1])
        learner.eval()
        test_acc += models.accuracy(learner,d_test[0],d_test[1],verbose=False)
        learner.train()
        # Done with a task
    # Update main network
    print('Epoch  % 2d Loss: %2.5e Avg Acc: %2.5f'%(epoch,adapt_loss/meta_train_task_count,test_acc/meta_train_task_count))
    display.clear_output(wait=True)
    optimizer.zero_grad()
    total_loss = adapt_loss
    total_loss.backward()
    optimizer.step()
    epoch+=1
e=time.time()
print(e-s)

19.30070161819458


In [None]:
# meta_test_kloader=KShotLoader(meta_test_ds,shots=shots,ways=ways)
test_acc = 0.0
# task_count = 20
adapt_steps = 5
maml.eval()
# Sample and train on a task
for task in meta_test_tasks:
    d_train,d_test=task
    learner = maml.clone()
    learner.eval()
    for adapt_step in range(adapt_steps):
        train_preds = learner(d_train[0])
        train_loss = lossfn(train_preds,d_train[1])
        learner.adapt(train_loss)
    test_preds = learner(d_test[0])
    test_acc += models.accuracy(learner,d_test[0],d_test[1],verbose=False)
    # Done with a task
learner.train()
print('Avg Acc: %2.5f'%(test_acc/meta_test_task_count))

Avg Acc: 0.66000


# CNP

In [None]:
class CNP(nn.Module):
    def __init__(self,n_features=1,dims=[32,32],n_classes=2,lr=1e-4):
        super(CNP,self).__init__()
        self.n_features = n_features
        self.n_classes = n_classes
        dimL1 = [n_features]+dims
        dimL2=[n_features+n_classes*dims[-1]]+dims+[n_classes]
        self.mlp1 = models.MLP(dims=dimL1,task='embedding')
        self.mlp2 = models.MLP(dims=dimL2)
        self.optimizer=torch.optim.Adam(self.parameters(),lr=lr)
    def adapt(self,X,y):
        R = self.mlp1(X)
        m = torch.eye(self.n_classes)[y].transpose(0,1)/self.n_classes
        r = (m@R).flatten().unsqueeze(0)
        #r = (R.sum(dim=0)/X.shape[0]).unsqueeze(0)
        return r
    def forward(self,Y,r):
        rr = r.repeat(Y.shape[0],1)
        p = self.mlp2(torch.cat((Y,rr),dim=1))
        return p
utils.hide_toggle('Class CNP')

In [None]:
# Redifning accuracy function so that it takes h - dataset context - as input since net requires it.
def accuracy(Net,X_test,y_test,h,verbose=True):
    #Net.eval()
    m = X_test.shape[0]
    y_pred = Net(X_test,h)
    _, predicted = torch.max(y_pred, 1)
    correct = (predicted == y_test).float().sum().item()
    if verbose: print(correct,m)
    accuracy = correct/m
    #Net.train()
    return accuracy

In [None]:
net = CNP(n_features=20,n_classes=ways,dims=[32,64,32],lr=1e-4)
lossfn = torch.nn.NLLLoss()

In [None]:
s=time.time()
epoch=0
n_epochs=100
# task_count=100
while epoch<n_epochs:
    # OUTER LOOP - GRADIENT DESCENT OVER SUM OF LOSSES ON DTEST
    test_loss = 0.0
    test_acc = 0.0
    # Sample and train on a task
    for task in meta_train_tasks:
        d_train,d_test=task
        # INNER LOOP - NO GRADIENT DESCENT
        rp = torch.randperm(d_train[1].shape[0])
        d_train0=d_train[0][rp]
        d_train1=d_train[1][rp]
        x_tr = d_train0
        d_tr = x_tr 
        h = net.adapt(d_tr,d_train1)
        rp1 = torch.randperm(d_test[1].shape[0])
        d_test0=d_test[0][rp1]
        d_test1=d_test[1][rp1]
        x_ts = d_test0
        # y_ts_sh = torch.zeros(x_ts.shape[0],ways)
        d_ts = x_ts 
        test_preds = net(d_ts,h)
        train_preds = net(d_tr,h)
        # Accumulate losses over tasks - note train and test loss both included
        test_loss += lossfn(test_preds,d_test1)+lossfn(train_preds,d_train1)
        net.eval()
        test_acc += accuracy(net,d_ts,d_test1,h,verbose=False)
        net.train()
    #Update the network weights
    print('Epoch  % 2d Loss: %2.5e Avg Acc: %2.5f'%(epoch,test_loss/meta_train_task_count,test_acc/meta_train_task_count))
    display.clear_output(wait=True)
    net.optimizer.zero_grad()
    test_loss.backward()
    net.optimizer.step()
    epoch+=1    
e=time.time()
print(e-s)

10.044007301330566


In [None]:
# meta_test_kloader=KShotLoader(meta_test_ds,shots=shots,ways=ways)
test_acc = 0.0
# task_count = 50
adapt_steps = 1
# Sample and train on a task
for task in meta_test_tasks:
    d_train,d_test=task
    x_tr = d_train[0]
    y_tr_sh = torch.cat((torch.zeros(1,ways),torch.eye(ways)[d_train[1][1:]]))
    d_tr = x_tr #torch.cat((x_tr,y_tr_sh),1)
    h=net.adapt(d_tr,d_train[1])
    x_ts = d_test[0]
    y_ts_sh = torch.zeros(x_ts.shape[0],ways)
    d_ts = x_ts #torch.cat((x_ts,y_ts_sh),1)
    test_preds = net(d_ts,h)
    test_acc += accuracy(net,d_ts,d_test[1],h,verbose=False)
    # Done with a task
net.train()
print('Avg Acc: %2.5f'%(test_acc/meta_test_task_count))

Avg Acc: 0.62000


# MAN

In [None]:
class Cos(nn.Module):
    def __init__(self,dims=[20,32,32]):
        super(Cos,self).__init__()
    def forward(self,target,ss):
        # compute cosine distances between 
        # target (batch,embedding_dim) and support set ss (ss_size,embedding_dim)
        # return (batch,ss_size)
        target_normed = F.normalize(target,p=2,dim=1)
        # shape of target_normed will be (batch,1,embedding_dim)
        ss_normed = F.normalize(ss,p=2,dim=1).permute(1,0)
        similarities = torch.mm(target_normed,ss_normed)
        # result will be (batch,ss_size)
        return similarities

In [None]:
class MAN(nn.Module):
    def __init__(self,dims=[20,32,32],n_classes=2,lr=1e-3):
        super(MAN,self).__init__()
        self.n_classes = n_classes
        self.mlp = models.MLP(dims=dims,task='embedding')
        self.cos = Cos()
        self.attn = nn.Softmax(dim=1)
        self.optimizer = optim.Adam(self.parameters(),lr=lr)
    def forward(self,X,d_train):
        # X = (batch,n_features)
        (x_tr,y_tr) = d_train
        # x_tr = (ss_size,n_features), y_tr = (ss_size)
        ss_e = self.mlp(x_tr)
        X_e = self.mlp(X)
        sims = self.cos(X_e,ss_e)
        # size (batch,ss_size)
        attn_wts = self.attn(sims)
        y_h = torch.eye(self.n_classes)[y_tr]
        # y_h = one-hot version of y_tr = (ss_size,n_classes)
        preds = attn_wts@y_h
        return preds

In [None]:
# Redefining accuracy function so that it takes h - dataset context - as input since net requires it.
def accuracy(Net,X_test,y_test,h,verbose=True):
    #Net.eval()
    m = X_test.shape[0]
    y_pred = Net(X_test,h)
    _, predicted = torch.max(y_pred, 1)
    correct = (predicted == y_test).float().sum().item()
    if verbose: print(correct,m)
    accuracy = correct/m
    #Net.train()
    return accuracy

In [None]:
net = MAN(n_classes=ways,dims=[20,64,32],lr=1e-4)

In [None]:
s=time.time()
epoch=0
n_epochs=100
# task_count=50
while epoch<n_epochs:
    test_loss = 0.0
    test_acc = 0.0
    # Sample and train on a task
    for task in meta_train_tasks:
        d_train,d_test=task
        rp = torch.randperm(d_train[1].shape[0])
        d_train0=d_train[0][rp]
        d_train1=d_train[1][rp]
        x_tr = d_train0
        d_tr = x_tr 
        rp1 = torch.randperm(d_test[1].shape[0])
        d_test0=d_test[0][rp1]
        d_test1=d_test[1][rp1]
        x_ts = d_test0
        d_ts = x_ts 
        test_preds = net(d_ts,(x_tr,d_train1))
        #train_preds = net(d_tr,h)
        # Accumulate losses over tasks - note train and test loss both included
        test_loss += lossfn(test_preds,d_test1)
        net.eval()
        test_acc += accuracy(net,d_ts,d_test1,(x_tr,d_train1),verbose=False)
        net.train()
    #Update the network weights
    print('Epoch  % 2d Loss: %2.5e Avg Acc: %2.5f'%(epoch,test_loss/meta_train_task_count,test_acc/meta_train_task_count))
    display.clear_output(wait=True)
    net.optimizer.zero_grad()
    test_loss.backward()
    net.optimizer.step()
    epoch+=1
e=time.time()
print(e-s)

7.215369939804077


In [None]:
# meta_test_kloader=KShotLoader(meta_test_ds,shots=shots,ways=ways,classes=classes_test)
test_acc = 0.0
# task_count = 50
adapt_steps = 1
# Sample and train on a task
for task in meta_test_tasks:
    d_train,d_test=task
    x_tr = d_train[0]
    y_tr_sh = torch.cat((torch.zeros(1,ways),torch.eye(ways)[d_train[1][1:]]))
    d_tr = x_tr #torch.cat((x_tr,y_tr_sh),1)
    x_ts = d_test[0]
    y_ts_sh = torch.zeros(x_ts.shape[0],ways)
    d_ts = x_ts #torch.cat((x_ts,y_ts_sh),1)
    test_preds = net(d_ts,(d_tr,d_train[1]))
    test_acc += accuracy(net,d_ts,d_test[1],(d_tr,d_train[1]),verbose=False)
    # Done with a task
net.train()
print('Avg Acc: %2.5f'%(test_acc/meta_test_task_count))

Avg Acc: 0.82000
