# One-shot Learning

This notebook follows the ideas of this post: https://towardsdatascience.com/one-shot-learning-with-siamese-networks-using-keras-17f34e75bb3d

# Download dataset

In [0]:
!wget https://dft-datasets.s3.us-east-2.amazonaws.com/catalog_images.zip
!mkdir data
!unzip -q catalog_images.zip -d data

# Prepare dataset

In [0]:
import sys
import numpy as np
from skimage import transform,io

from imageio import imread
import pickle
from pathlib import Path
import matplotlib.pyplot as plt
import argparse


data_path = Path("data")
train_folder = data_path / 'train'
eval_folder = data_path / 'eval'

save_path = Path(".")

lang_dict = {}


def loadimgs(path,a=90, n=0):
    #if data not already unzipped, unzip it.
    X=[]
    y = []
    lang_dict = {}
    curr_y = n
    #we load every class seperately so we can isolate them later
    for label_path in sorted(path.iterdir()):
        label = str(label_path).split('/')[-1]
        print("loading label: " + label)
        lang_dict[label] = curr_y
        category_images=[]
        for image_path in label_path.iterdir():
            image = io.imread(image_path)
            if image.shape == (224, 224, 3): # due to https://github.com/scikit-image/scikit-image/issues/2406
              small_image = transform.resize(image, (105, 105, 3), mode='symmetric', preserve_range=True)
              category_images.append(small_image)
              y.append(curr_y)

        try:
            X.append(np.stack(category_images[:a])) # due to having a different number of images in folders
            #edge case  - last one
        except ValueError as e:
            print(e)
            print("error - category_images:", category_images)

        curr_y += 1
    y = np.vstack(y)
    X = np.stack(X)
    return X,y,lang_dict

  
X,y,c=loadimgs(train_folder)
with open(Path(save_path) / Path("train.pickle"), "wb") as f:
	pickle.dump((X,y,c),f)


X,y,c=loadimgs(eval_folder, 9)
with open(Path(save_path) / Path("val.pickle"), "wb") as f:
	pickle.dump((X,y,c),f)

In [0]:
import numpy.random as rng
import numpy as np
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils import shuffle

class Siamese_Loader:
    """For loading batches and testing tasks to a siamese net"""
    def __init__(self, path, data_subsets = ["train", "val"]):
        self.data = {}
        self.categories = {}
        self.info = {}
        
        for name in data_subsets:
            file_path = Path(path) / Path(name + ".pickle")
            print("loading data from {}".format(file_path))
            with open(file_path,"rb") as f:
                (X,y,c) = pickle.load(f)
                self.data[name] = X
                self.categories[name] = c

    def get_batch(self,batch_size,s="train"):
        """Create batch of n pairs, half same class, half different class"""
        X=self.data[s]

        n_classes, n_examples, w, h, channels = X.shape

        #randomly sample several classes to use in the batch
        categories = rng.choice(n_classes,size=(batch_size,),replace=False)
        #initialize 2 empty arrays for the input image batch
        pairs=[np.zeros((batch_size, h, w, channels)) for i in range(2)]
        #initialize vector for the targets, and make one half of it '1's, so 2nd half of batch has same class
        targets=np.zeros((batch_size,))
        targets[batch_size//2:] = 1
        for i in range(batch_size):
            category = categories[i]
            idx_1 = rng.randint(0, n_examples)
            pairs[0][i,:,:,:] = X[category, idx_1].reshape(w, h, channels)
            idx_2 = rng.randint(0, n_examples)
            #pick images of same class for 1st half, different for 2nd
            if i >= batch_size // 2:
                category_2 = category  
            else: 
                #add a random number to the category modulo n classes to ensure 2nd image has
                # ..different category
                category_2 = (category + rng.randint(1,n_classes)) % n_classes
            pairs[1][i,:,:,:] = X[category_2,idx_2].reshape(w, h, channels)
        return pairs, targets
    
    def generate(self, batch_size, s="train"):
        """a generator for batches, so model.fit_generator can be used. """
        while True:
            pairs, targets = self.get_batch(batch_size,s)
            yield (pairs, targets)    

    def make_oneshot_task(self,N,s="val"):
        """Create pairs of test image, support set for testing N way one-shot learning. """
        X=self.data[s]

        n_classes, n_examples, w, h, channels = X.shape
        indices = rng.randint(0,n_examples,size=(N,))

        categories = rng.choice(range(n_classes),size=(N,),replace=False)            
        true_category = categories[0]
        
        ex1, ex2 = rng.choice(n_examples,replace=False,size=(2,))
        test_image = np.asarray([X[true_category,ex1,:,:,:]]*N).reshape(N, w, h, channels)
        
        support_set = X[categories,indices,:,:]
        support_set[0,:,:] = X[true_category,ex2]
        support_set = support_set.reshape(N, w, h, channels)
        
        targets = np.zeros((N,))
        targets[0] = 1
        targets, test_image, support_set = shuffle(targets, test_image, support_set)
        pairs = [test_image,support_set]

        return pairs, targets
    
    def test_oneshot(self,model,N,k,s="val",verbose=0):
        """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
        n_correct = 0
        if verbose:
            print("Evaluating model on {} random {} way one-shot learning tasks ...".format(k,N))
        for i in range(k):
            inputs, targets = self.make_oneshot_task(N,s)
            probs = model.predict(inputs)
            if np.argmax(probs) == np.argmax(targets):
                n_correct+=1
        percent_correct = (100.0*n_correct / k)
        if verbose:
            print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct,N))
        return percent_correct
    
    def train(self, model, epochs, verbosity):
        model.fit_generator(self.generate(batch_size),
                            
                             )
    
    
#Instantiate the class
loader = Siamese_Loader(".")

# Undertanding batches

In [0]:
from PIL import Image
import math

def concat_images(X, n_images_column=4):
    """Concatenates a bunch of images into a big matrix for plotting purposes."""

    nc,h,w,channels = X.shape

    total_width = w * min(nc, n_images_column)
    max_height = h * max(math.ceil(nc/n_images_column), 1)

    new_im = Image.new('RGB', (total_width, max_height))
    
    x_offset = 0
    y_offset = 0
    for i in range(nc):
      new_im.paste(Image.fromarray(np.uint8(X[i,:,:,:])), (x_offset*w, y_offset*h))
      x_offset += 1
      if x_offset==n_images_column:
        x_offset = 0
        y_offset +=1
  
  
    n = np.ceil(np.sqrt(nc)).astype("int8")
    img = np.zeros((n*w,n*h,channels))
    
    for channel in range(channels):
      x = 0
      y = 0
      for example in range(nc):
          img[x*w:(x+1)*w, y*h:(y+1)*h, channel] = X[example][:,:,channel]
          y += 1
          if y >= n:
              y = 0
              x += 1
    return np.array(new_im)

  

def plot_oneshot_task(pairs):
    """Takes a one-shot task given to a siamese net and  """
    a4_dims = (11.7, 8.27)
    fig,(ax1,ax2) = plt.subplots(2, figsize=a4_dims)
    
    ax1.imshow(Image.fromarray(np.uint8(pairs[0][0])))
    
    img = concat_images(pairs[1])
    ax1.get_yaxis().set_visible(False)
    ax1.get_xaxis().set_visible(False)
    ax2.imshow(img)
    
    plt.xticks([])
    plt.yticks([])
    plt.show()
    
    
#example of a one-shot learning task
pairs, targets = loader.make_oneshot_task(4,"train")

plot_oneshot_task(pairs)

# Experiment


## Defining our architecture

In [0]:
from tensorflow.keras.layers import Input, Conv2D, Lambda, Dense, Flatten,MaxPooling2D
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import SGD,Adam
from tensorflow.keras.losses import binary_crossentropy
import numpy.random as rng
import numpy as np
import os
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils import shuffle
%matplotlib inline
def W_init(shape,name=None):
    """Initialize weights as in paper"""
    values = rng.normal(loc=0,scale=1e-2,size=shape)
    return K.variable(values,name=name)
#//TODO: figure out how to initialize layer biases in keras.
def b_init(shape,name=None):
    """Initialize bias as in paper"""
    values=rng.normal(loc=0.5,scale=1e-2,size=shape)
    return K.variable(values,name=name)

input_shape = (105, 105, 3)

left_input = Input(input_shape)
right_input = Input(input_shape)
#build convnet to use in each siamese 'leg'
convnet = Sequential()
convnet.add(Conv2D(64,(10,10),activation='relu',input_shape=input_shape,
                   kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(4,4),activation='relu',kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256,(4,4),activation='relu',kernel_regularizer=l2(2e-4)))
convnet.add(Flatten())
convnet.add(Dense(4096,activation="sigmoid",kernel_regularizer=l2(1e-3)))

#call the convnet Sequential model on each of the input tensors so params will be shared
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)
#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
#call this layer on list of two input tensors.
L1_distance = L1_layer([encoded_l, encoded_r])
prediction = Dense(1,activation='sigmoid')(L1_distance)
siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)

optimizer = Adam(0.00006)
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy",optimizer=optimizer)

siamese_net.count_params()

## running the experiment

In [0]:
from tqdm import tnrange, tqdm_notebook
import math

#Training loop
print("!")
evaluate_every = 1 # interval for evaluating on one-shot tasks

batch_size = 4
n_iter = 1400
N_way = 2 # how many classes for testing one-shot tasks>
n_val = 10 #how mahy one-shot tasks to validate on?
best = -1
weights_path = Path(".") / Path("weights")

print("training")
val_accs = []
losses = []
i = 0
loss = math.inf

t = tnrange(n_iter, desc="loss: {:.2f},".format(i,loss), leave=True)

for i in t:
    (inputs,targets)=loader.get_batch(batch_size)
    loss=siamese_net.train_on_batch(inputs,targets)
    t.set_description("loss: {:.2f},".format(loss))
    t.refresh() # to show immediately the update
    losses.append(loss)
    
    if i % evaluate_every == 0:
        val_acc = loader.test_oneshot(siamese_net,N_way,n_val,verbose=False)
        val_accs.append(val_acc)
        if val_acc >= best:
            siamese_net.save(weights_path)
            best=val_acc

## Evaluating our results

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

sns.set_style("darkgrid")

plt.plot(losses)
plt.show()

plt.plot(pd.Series(val_accs).rolling(20).mean())
plt.show()