In [1]:
import keras
from keras.applications.vgg16 import VGG16
from keras.layers import Input, Conv2D, Lambda, merge, Dense, Flatten,MaxPooling2D,GlobalAveragePooling2D
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.losses import binary_crossentropy
import numpy.random as rng
import numpy as np
import os
import cv2
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils import shuffle
%matplotlib inline

Using TensorFlow backend.


In [37]:
input_shape = (224, 224, 3)
left_input = Input(input_shape)
right_input = Input(input_shape)

In [38]:
convnet = VGG16(weights = 'imagenet', include_top = False)

In [39]:
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)

In [40]:
relation_pair=keras.layers.concatenate([encoded_l,encoded_r], axis=-1)

In [43]:
relation_pair.shape

TensorShape([Dimension(None), Dimension(7), Dimension(7), Dimension(1024)])

In [44]:
RN = Sequential()
RN.add(Conv2D(128,(3,3),activation='relu',input_shape = input_shape))
RN.add(Conv2D(64,(3,3),activation='relu'))
RN.add(MaxPooling2D())
RN.add(Flatten())
RN.add(Dense(128,activation='relu',name='fc1'))
RN.add(Dense(1,activation='sigmoid',name='fc2'))

In [45]:
prediction=RN(relation_pair)

In [46]:
siamese_RN = Model(inputs=[left_input,right_input],outputs=prediction)

In [50]:
optimizer = Adam(0.00006)
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_RN.compile(loss='mean_squared_error',optimizer=optimizer)

In [52]:
# freeze all layers of the pre-trained model
for layer in convnet.layers:
    layer.trainable = False

In [53]:
PATH = "./Bongard/BP_61" #CHANGE THIS - path where the pickled data is stored

with open(os.path.join(PATH, "train.pickle"), "rb") as f:
    (X,c) = pickle.load(f)

with open(os.path.join(PATH, "val.pickle"), "rb") as f:
    (Xval,cval) = pickle.load(f)
    
print("training alphabets")
print(c.keys())
print("validation alphabets:")
print(cval.keys())

training alphabets
dict_keys(['left', 'right'])
validation alphabets:
dict_keys(['left', 'right'])


In [None]:
class Siamese_Loader:
    """For loading batches and testing tasks to a siamese net"""
    def __init__(self, path, data_subsets = ["train", "val"]):
        self.data = {}
        self.categories = {}
        self.info = {}
        
        for name in data_subsets:
            file_path = os.path.join(path, name + ".pickle")
            print("loading data from {}".format(file_path))
            with open(file_path,"rb") as f:
                (X,c) = pickle.load(f)
                self.data[name] = X
                self.categories[name] = c

    def get_batch(self,batch_size,s="train"):
        """Create batch of n pairs, half same class, half different class"""
        X=self.data[s]
        n_classes, n_examples, w, h = X.shape

        #randomly sample several classes to use in the batch
        categories = rng.choice(n_classes,size=(batch_size,),replace=False)
        #initialize 2 empty arrays for the input image batch
        #pairs=[np.zeros((batch_size, h, w,1)) for i in range(2)]
        pairs=[np.zeros((batch_size, h, w,3)) for i in range(2)]
        #initialize vector for the targets, and make one half of it '1's, so 2nd half of batch has same class
        targets=np.zeros((batch_size,))
        targets[batch_size//2:] = 1
        for i in range(batch_size):
            category = categories[i]
            idx_1 = rng.randint(0, n_examples)
            #pairs[0][i,:,:,:] = X[category, idx_1].reshape(w, h, 1)
            
            pairs[0][i,:,:,:] = cv2.cvtColor(cv2.resize(X[category, idx_1], (w, h)), cv2.COLOR_GRAY2RGB)
            idx_2 = rng.randint(0, n_examples)
            #pick images of same class for 1st half, different for 2nd
            if i >= batch_size // 2:
                category_2 = category  
            else: 
                #add a random number to the category modulo n classes to ensure 2nd image has
                # ..different category
                category_2 = (category + rng.randint(1,n_classes)) % n_classes
            #pairs[1][i,:,:,:] = X[category_2,idx_2].reshape(w, h,1)
            pairs[1][i,:,:,:]=cv2.cvtColor(cv2.resize(X[category_2,idx_2], (w, h)), cv2.COLOR_GRAY2RGB)
        return pairs, targets
    
    def generate(self, batch_size, s="train"):
        """a generator for batches, so model.fit_generator can be used. """
        while True:
            pairs, targets = self.get_batch(batch_size,s)
            yield (pairs, targets)   
            
    def test_bongard(self,model,N,k,s='val',s1='train',verbose=0):
        
        X_train=self.data[s1]
        X_val=self.data[s]
        
        n_classes, n_examples, w, h = X_train.shape
        m = n_classes*n_examples
        X_train=X_train.reshape(m, w, h,1)
        X_train = [cv2.cvtColor(cv2.resize(i, (w, h)), cv2.COLOR_GRAY2RGB) for i in X_train]
        X_train=np.array(X_train)

        
        n_classes_val, n_examples_val, w, h = X_val.shape
        m_val = n_classes_val * n_examples_val
        X_val=X_val.reshape(m_val, w, h,1)
        X_val = [cv2.cvtColor(cv2.resize(i, (w, h)), cv2.COLOR_GRAY2RGB) for i in X_val]
        X_val=np.array(X_val)
        
        
        n_correct = 0
        
        for i in range(k):
            
            test_image = np.asarray([X_val[i,:,:,:]]*m).reshape(m, w, h,3)
            support_set =  X_train
            if i < k/2:
                targets=0
            else:
                targets=1 
            
            pairs = [test_image,support_set]
            
            probs = model.predict(pairs)
            if np.argmax(probs)<6:
                test_result=0
            else:
                test_result=1
            if test_result== targets:
                n_correct+=1
                
        percent_correct = (100.0*n_correct / k)
        if verbose:
            print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct,N))
        return percent_correct
            
    def train(self, model, epochs, verbosity):
        model.fit_generator(self.generate(batch_size),
                            
                             )
    
    
#Instantiate the class
loader = Siamese_Loader(PATH)

In [None]:
#Training loop
print("!")
evaluate_every = 1 # interval for evaluating on one-shot tasks
loss_every=50 # interval for printing loss (iterations)
batch_size = 2
n_iter = 100
N_way = 2 # how many classes for testing one-shot tasks>
n_val = 1000 #how mahy one-shot tasks to validate on?
best = 0.9
weights_path = os.path.join(PATH, "weights")
print("training")
for i in range(1, n_iter):
    (inputs,targets)=loader.get_batch(batch_size)
    loss=siamese_net.train_on_batch(inputs,targets)
    print(loss)
    if i % evaluate_every == 0:
        print("evaluating")
        val_acc = loader.test_bongard(siamese_net,N_way,n_val,s='val',s1='train',verbose=True)
        if val_acc >= best:
            print("saving")
            siamese_net.save(weights_path)
            best=val_acc

    if i % loss_every == 0:
        print("iteration {}, training loss: {:.2f},".format(i,loss))