### load all necessary libraries

In [1]:
from keras.layers import Input, Conv2D, Lambda, merge, Dense, Flatten,MaxPooling2D,Activation, Dropout
from keras.models import Model, Sequential
from keras.regularizers import l2
from keras import backend as K
from keras.optimizers import Adam
from keras import optimizers
#from skimage.io import imshow
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
from keras import backend as k 
from tensorflow.python.keras.backend import set_session
#from tf.compat.v1.keras.backend import set_session
#from keras.backend.tensorflow_backend import set_session
from keras.applications import resnet50, vgg16, vgg19, xception, densenet, inception_v3, mobilenet, nasnet, inception_resnet_v2
from tensorflow.compat.v1.keras.applications import MobileNetV2
import tensorflow as tf
from keras.callbacks import ModelCheckpoint, TensorBoard, CSVLogger, EarlyStopping
from keras.applications.resnet50 import preprocess_input
#from keras.applications.xception import preprocess_input
import os
import datetime
import json
from keras.preprocessing.image import ImageDataGenerator

### Load and pre_process all Images

In [4]:
import glob
import cv2
from random import shuffle
dog_path = '/home/sina/Desktop/Dataset/Electronics,Accessories&Supplies,Audio&VideoAccessories,Cables&Interconnects,VideoCables/Audio&VideoAccessories/DVICables/*'
cat_path = '/home/sina/Desktop/Dataset/Electronics,Accessories&Supplies,Audio&VideoAccessories,Cables&Interconnects,VideoCables/Audio&VideoAccessories/CompositeVideo/*'
addrsd = glob.glob(dog_path)
addrsc = glob.glob(cat_path)
    
labelsd = [1 for addr in addrsd]  # 1 = dog, 0 =  cat
labelsc = [0 for addr in addrsc]
# loop over the input images
datad = []
for imagePath in addrsd:
# load the image, pre-process it, and store it in the data list
    img = cv2.imread(imagePath)
    img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_CUBIC)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    datad.append(img)
datac = []
for imagePath in addrsc:
# load the image, pre-process it, and store it in the data list
    img = cv2.imread(imagePath)
    img = cv2.resize(img, (224, 224), interpolation=cv2.INTER_CUBIC)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    datac.append(img)
# to shuffle data
shuffle_data = True
if shuffle_data:
    d = list(zip(datad, labelsd))
    c = list(zip(datac, labelsc))
    e = d + c
    shuffle(e)
    data, labels = zip(*e)
del datad
del datac
del addrsd
del addrsc
    
Y_train = np.array(labels)
X_train = np.array(data, dtype="int8")
#preprocess for Resnet- 50
X_train =  preprocess_input(X_train)

## Define the architecture of the model:

In [5]:
# Two inputs one each - left and right image
left_input = Input((224,224,3))
right_input = Input((224,224,3))
#Import Resnetarchitecture from keras application and initializing each layer with pretrained imagenet weights.
'’'
Please note that it’s usually better to intialize the layers with imagenet initializations than random. While training I will be updating the weights for each layer in each epoch. we don’t want to confuse this activity with transfer learning as I am not freezing any layer but initilializing each layer with imagenet weights
'’'
convnet = resnet50.ResNet50(weights='imagenet', include_top=False, input_shape=(224,224,3))
# Add the final fully connected layers
x = convnet.output
x = Flatten()(x)
x = Dense(1024, activation="relu")(x)
preds = Dense(18, activation='sigmoid')(x) # Apply sigmoid
convnet = Model(inputs=convnet.input, outputs=preds)
#Applying above model for both the left and right images
encoded_l = convnet(left_input)
encoded_r = convnet(right_input)
# Euclidian Distance between the two images or encodings through the Resnet-50 architecture
Euc_layer = Lambda(lambda tensor:K.abs(tensor[0] - tensor[1]))
# use and add the distance function
Euc_distance = Euc_layer([encoded_l, encoded_r])
#identify the prediction
prediction = Dense(1,activation='sigmoid')(Euc_distance)
#Define the network with the left and right inputs and the ouput prediction
siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
#define the optimizer. Here I have used SGD with nesterov momentum

optim = optimizers.SGD(lr=0.001, decay=.01, momentum=0.9, nesterov=True)
#compile the network using binary cross entropy loss and the above optimizer

siamese_net.compile(loss="binary_crossentropy",optimizer=optim,metrics=["accuracy"])


## Image pairs are created with 2 labels as 0 & 1

## Create test & train Dataset

In [6]:
image_list = X_train[:180]
label_list = Y_train[:180]
left_input = []
right_input = []
targets = []
#Number of pairs per image
pairs = 8
#create the dataset to train on
for i in range(len(label_list)):
    for j in range(pairs):
# we need to make sure that we are not comparing with the same image
        compare_to = i
        while compare_to == i: 
            compare_to = random.randint(0,179)
        left_input.append(image_list[i])
        right_input.append(image_list[compare_to])
        if label_list[i] == label_list[compare_to]:
            # if the images are same then label - 1
            targets.append(1.)
        else:
            # if the images are different then label - 0
            targets.append(0.)
            
#remove single-dimensional entries from the shape of the arrays and making them ready to create the train & datasets 
 
#the train data - left right images arrays and target label
left_input = np.squeeze(np.array(left_input))
right_input = np.squeeze(np.array(right_input))
targets = np.squeeze(np.array(targets))
# Creating test datasets - left, right images and target label
dog_image = X_train[4] #dog_image = 1, cat_image = 0
test_left = []
test_right = []
test_targets = []
for i in range(len(Y_train)-180):
    test_left.append(dog_image)
    test_right.append(X_train[i+180])
    test_targets.append(Y_train[i+180])
test_left = np.squeeze(np.array(test_left))
test_right = np.squeeze(np.array(test_right))
test_targets = np.squeeze(np.array(test_targets))

## Train the code using GPU

In [9]:
import tensorflow as tf
import os
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
session = tf.compat.v1.Session(config=config)
#from keras_input_pipeline import *
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
siamese_net.summary()
with tf.device('/gpu:1'):
    siamese_net.fit([left_input,right_input], targets,
          batch_size=16,
          epochs=30,
          verbose=1,
          validation_data=([test_left,test_right],test_targets))


Model: "model_3"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
input_4 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
model_2 (Model)                 (None, 18)           126367634   input_3[0][0]                    
                                                                 input_4[0][0]                    
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 18)           0           model_2[1][0]              

In [None]:
import numpy as np
import os
import random
from sys import platform as sys_pf
import matplotlib
if sys_pf == 'darwin':
	matplotlib.use("TkAgg")
from matplotlib import pyplot as plt
# ---
# Demo for how to load image and stroke data for a character
# ---

# Plot the motor trajectory over an image
#
# Input
#  I [105 x 105 nump] grayscale image
#  drawings: [ns list] of strokes (numpy arrays) in motor space
#  lw : line width
def plot_motor_to_image(I,drawing,lw=2):
	drawing = [d[:,0:2] for d in drawing] # strip off the timing data (third column)
	drawing = [space_motor_to_img(d) for d in drawing] # convert to image space
	plt.imshow(I,cmap='gray')
	ns = len(drawing)
	for sid in range(ns): # for each stroke
		plot_traj(drawing[sid],get_color(sid),lw)
	plt.xticks([])
	plt.yticks([])

# Plot individual stroke
#
# Input
#  stk: [n x 2] individual stroke
#  color: stroke color
#  lw: line width
def plot_traj(stk,color,lw):
	n = stk.shape[0]
	if n > 1:
		plt.plot(stk[:,0],stk[:,1],color=color,linewidth=lw)
	else:
		plt.plot(stk[0,0],stk[0,1],color=color,linewidth=lw,marker='.')

# Color map for the stroke of index k
def get_color(k):	
    scol = ['r','g','b','m','c']
    ncol = len(scol)
    if k < ncol:
       out = scol[k]
    else:
       out = scol[-1]
    return out

# convert to str and add leading zero to single digit numbers
def num2str(idx):
	if idx < 10:
		return '0'+str(idx)
	return str(idx)

# Load binary image for a character
#
# fn : filename
def load_img(fn):
	I = plt.imread(fn)
	I = np.array(I,dtype=bool)
	return I

# Load stroke data for a character from text file
#
# Input
#   fn : filename
#
# Output
#   motor : list of strokes (each is a [n x 3] numpy array)
#      first two columns are coordinates
#	   the last column is the timing data (in milliseconds)
def load_motor(fn):
	motor = []
	with open(fn,'r') as fid:
		lines = fid.readlines()
	lines = [l.strip() for l in lines]
	for myline in lines:
		if myline =='START': # beginning of character
			stk = []
		elif myline =='BREAK': # break between strokes
			stk = np.array(stk)
			motor.append(stk) # add to list of strokes
			stk = [] 
		else:
			arr = np.fromstring(myline,dtype=float,sep=',')
			stk.append(arr)
	return motor

#
# Map from motor space to image space (or vice versa)
#
# Input
#   pt: [n x 2] points (rows) in motor coordinates
#
# Output
#  new_pt: [n x 2] points (rows) in image coordinates
def space_motor_to_img(pt):
	pt[:,1] = -pt[:,1]
	return pt
def space_img_to_motor(pt):
	pt[:,1] = -pt[:,1]
	return

if __name__ == "__main__":
	img_dir = 'images_background'
	stroke_dir = 'strokes_background'
	nreps = 20 # number of renditions for each character
	nalpha = 5 # number of alphabets to show

	alphabet_names = [a for a in os.listdir(img_dir) if a[0] != '.'] # get folder names
	alphabet_names = random.sample(alphabet_names,nalpha) # choose random alphabets

	for a in range(nalpha): # for each alphabet
		print('generating figure ' + str(a+1) + ' of ' + str(nalpha))
		alpha_name = alphabet_names[a]
		
		# choose a random character from the alphabet
		character_id = random.randint(1,len(os.listdir(os.path.join(img_dir,alpha_name))))

		# get image and stroke directories for this character
		img_char_dir = os.path.join(img_dir,alpha_name,'character'+num2str(character_id))
		stroke_char_dir = os.path.join(stroke_dir,alpha_name,'character'+num2str(character_id))

		# get base file name for this character
		fn_example = os.listdir(img_char_dir)[0]
		fn_base = fn_example[:fn_example.find('_')] 

		plt.figure(a,figsize=(10,8))
		plt.clf()
		for r in range(1,nreps+1): # for each rendition
			plt.subplot(4,5,r)
			fn_stk = stroke_char_dir + '/' + fn_base + '_' + num2str(r) + '.txt'
			fn_img = img_char_dir + '/' + fn_base + '_' + num2str(r) + '.png'			
			motor = load_motor(fn_stk)
			I = load_img(fn_img)
			plot_motor_to_image(I,motor)
			if r==1:
				plt.title(alpha_name[:15] + '\n character ' + str(character_id))
		plt.tight_layout()
	plt.show()
