In [16]:
import keras
from keras.layers import Input, Conv2D, Lambda, merge, Dense, Flatten,MaxPooling2D
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import SGD,Adam
from keras.losses import binary_crossentropy
import numpy.random as rng
import numpy as np

from os import listdir
from os.path import isfile, isdir, join
from  ntpath import basename
import pandas as pd
from random import shuffle
import cv2
import os
import random

In [17]:
sample_path = 'experiment-samples-nogit/large-pabk'
#sample_path = 'experiment-samples-nogit/large-csob'

trainSampleGroupSize = 1
testSampleGroupSize = 20

IMAGE_WIDTH = 100
IMAGE_HEIGHT = 100

In [18]:
# Generate train samples

scenes_path = sample_path + '/scenes'
scene_files = [(scenes_path+'/'+f) for f in listdir(scenes_path) if isfile(join(scenes_path, f))]

scenes = []
dict_scenes = {}
for path in scene_files:
    img_data = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    img_data = cv2.resize(img_data, (IMAGE_WIDTH, IMAGE_HEIGHT))
    scenes.append(np.array(img_data))
    dict_scenes[int(os.path.splitext(basename(path))[0])] = img_data

N = len(scenes)
batch_size = 2 * N * (N-1)

lefts = []
rights = []
for scene in scenes:
    for j in range(N-1):
        lefts.append(scene)
        rights.append(scene)
for i, scene1 in enumerate(scenes):
    for j, scene2 in enumerate(scenes):
        if (i != j):
            lefts.append(scene1)
            rights.append(scene2)
            
#pairs= (np.array(twins)).reshape(-1,IMAGE_WIDTH,IMAGE_HEIGHT,1)
lefts = (np.array(lefts)).reshape(-1,IMAGE_WIDTH,IMAGE_HEIGHT,1)
rights = (np.array(lefts)).reshape(-1,IMAGE_WIDTH,IMAGE_HEIGHT,1)

targets=np.zeros((batch_size,))
targets[batch_size//2:] = 1
targets = (np.array(targets)).reshape(batch_size,1)

#print(np.shape(targets))

In [19]:
# Generate test samples

recordings_path = sample_path + '/recordings'
recordings = [d for d in listdir(recordings_path) if isdir(join(recordings_path, d))]
paths = []
for recording in recordings:
    recording_path = recordings_path + '/' + recording
    paths = paths + [(recording_path+'/'+f) for f in listdir(recording_path) if isfile(join(recording_path, f))]
    

labels = pd.read_excel(sample_path+'/labels.xlsx', sheetname='labels')
labels.scene = pd.to_numeric(labels.scene.replace('none','0'))
labels = labels[labels.frame.isin([basename(path) for path in paths])]


test_table = labels.groupby('scene', group_keys=False).apply(lambda x: x.sample(min(len(x), testSampleGroupSize)))

test_lefts = []
test_rights = []
test_targets = []
for i, row in test_table.iterrows():
    path = recordings_path+'/'+row.recording+'/'+row.frame
    img_data = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    img_data = cv2.resize(img_data, (IMAGE_WIDTH, IMAGE_HEIGHT))
    for scene_number, scene_data in dict_scenes.items():
        if row.scene == 0:
            ;
        else:
            test_lefts.append(img_data)
            test_rights.append(scene_data)
            test_lefts.append(scene_data)
            test_rights.append(img_data)
            if row.scene == scene_number:
                test_targets.append(1)
                test_targets.append(1)
            else:
                test_targets.append(0)
                test_targets.append(0)

test_lefts= (np.array(test_lefts)).reshape(-1,IMAGE_WIDTH,IMAGE_HEIGHT,1)       
test_rights= (np.array(test_rights)).reshape(-1,IMAGE_WIDTH,IMAGE_HEIGHT,1)       
test_targets = (np.array(test_targets)).reshape(len(test_targets),1)

        

In [22]:
#Model

def W_init(shape,name=None):
    """Initialize weights as in paper"""
    values = rng.normal(loc=0,scale=1e-2,size=shape)
    return K.variable(values,name=name)
#//TODO: figure out how to initialize layer biases in keras.
def b_init(shape,name=None):
    """Initialize bias as in paper"""
    values=rng.normal(loc=0.5,scale=1e-2,size=shape)
    return K.variable(values,name=name)

input_shape = (IMAGE_WIDTH, IMAGE_HEIGHT, 1)
left_input = Input(input_shape)
right_input = Input(input_shape)
#build convnet to use in each siamese 'leg'
convnet = Sequential()
convnet.add(Conv2D(64,(10,10),activation='relu',input_shape=input_shape,
                   kernel_initializer=W_init,kernel_regularizer=l2(2e-4)))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(7,7),activation='relu',
                   kernel_regularizer=l2(2e-4),kernel_initializer=W_init,bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(128,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(MaxPooling2D())
convnet.add(Conv2D(256,(4,4),activation='relu',kernel_initializer=W_init,kernel_regularizer=l2(2e-4),bias_initializer=b_init))
convnet.add(Flatten())
convnet.add(Dense(512,activation="sigmoid",kernel_regularizer=l2(1e-3),kernel_initializer=W_init,bias_initializer=b_init))

#call the convnet Sequential model on each of the input tensors so params will be shared
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)
#layer to merge two encoded inputs with the l1 distance between them
L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
#call this layer on list of two input tensors.
L1_distance = L1_layer([encoded_l, encoded_r])
prediction = Dense(1,activation='sigmoid',bias_initializer=b_init)(L1_distance)
siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)

optimizer = Adam(0.01)
#//TODO: get layerwise learning rates and momentum annealing scheme described in paperworking
siamese_net.compile(loss="binary_crossentropy",optimizer=optimizer, 
                 metrics=['accuracy'])

siamese_net.count_params()

4472641

In [23]:
tbCallBack = keras.callbacks.TensorBoard(log_dir='./logs-siamese2/pabk2', write_graph=True)

# Training
print("!")
siamese_net.fit (x=[lefts,rights], 
                 y=targets, 
                 epochs=100,
                 shuffle=True,
                 validation_data=([test_lefts,test_rights], test_targets),
                 callbacks=[tbCallBack])

!
Train on 264 samples, validate on 5760 samples
Epoch 1/100

KeyboardInterrupt: 