In [5]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Conv2D, Input, MaxPooling2D, Lambda, Flatten, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras import backend as K
from tensorflow.keras.regularizers import l2
import os

import glob
import numpy as np
from PIL import Image

class Siamese_Loader:
    """For loading batches and testing tasks to a siamese net"""
    def __init__(self, path, data_subsets=["train", "val"]):
        print("hh")
        # gets all file paths (separate for each person)
        self.file_set = [file for file in (glob.glob(r'C:\Users\S545241\Desktop\ecg_img\ecg-id-database-filter\Person_{:02d}/*.png'.format(num)) for num in range(1, 89))]
        print(self.file_set)
        # defining train and val set
        self.val_set = [1, 6, 13, 15, 21, 25, 29, 38, 41, 44, 52, 62, 68, 74, 80]
        #self.val_set = [1]
        self.train_set = list(range(1, 85))
        for ele in sorted(self.val_set, reverse=True):  
            del self.train_set[ele-1]

    def get_batch(self, batch_size, s="train"):
        """Create batch of n pairs, half same class, half different class"""
      
        if s=='train':
          ids = self.train_set
          #print(ids)
        else:
          ids = self.val_set

        w, h = 144, 224

        #initialize 2 empty arrays for the input image batch
        pairs = [np.zeros((batch_size, w, h, 1)) for i in range(2)]
        #initialize vector for the targets, and make one half of it '1's, so 2nd half of batch has same class
        targets = np.zeros((batch_size,))
        targets[batch_size//2:] = 1

        for i in range(batch_size):
            # select random person
            idx_1, idx_2 = np.random.choice(ids, size=2).tolist()
            #print(idx_1)
            #print(idx_2)
            #print(self.file_set[idx_1-2])
            # select random ecg sample of the person
            try:
                pair_x, pair_y = np.random.choice(self.file_set[idx_1-2],size=2, replace = False).tolist()
            except ValueError:
                pair_x, pair_y = np.random.choice(self.file_set[idx_1-2],size=2, replace=True).tolist()

            #(pair_x,pair_y).tolist()
            #print(pair_x)
            #print("y: ", pair_y)
            
            # load and format the image
            
            img = Image.open(pair_x)
            img = img.resize((h, w))
            #img.show()
            pairs[0][i,:,:,:] = np.array(img)[:,:,0:1] / 255

            #pick images of same class for 1st half, different for 2nd
            if i < batch_size // 2:
                pair_y = np.random.choice(self.file_set[idx_2-2], 1)[0]
                #print(pair_y)

            img = Image.open(pair_y)
            img = img.resize((h, w))
            #img.show()
            pairs[1][i,:,:,:] = np.array(img)[:,:,0:1] / 255
            #print(pairs)
        return pairs, targets
    
    def generate(self, batch_size, s="train"):
        """a generator for batches, so model.fit_generator can be used. """
        while True:
            pairs, targets = self.get_batch(batch_size,s)
            yield (pairs, targets)    

    def make_oneshot_task(self, N, s="val"):
        """Create pairs of test image, support set for testing N way one-shot learning. """

        if s=='train':
          ids = self.train_set
        else:
          ids = self.val_set

        w, h = 144, 224

        # select random people
        try:
            idx = np.random.choice(ids, N, replace=False)
        except:
            idx = np.random.choice(ids, N, replace=True)
        true_idx = idx[0]      # test person
        false_idx = idx[1:]    # random people

        pairs = [np.zeros((N, w, h, 1)) for i in range(2)]

        # contains N-1 sample of random people, 1 sample of test person
        support_set = np.zeros((N, w, h, 1))
        # contains N sample of test person (1 person)
        test_image = np.zeros((N, w, h, 1))

        targets = np.zeros((N,))
        targets[0] = 1

        # gets file path of N+1 sample of test person
        pair_x = np.random.choice(self.file_set[true_idx-2], N+1).tolist()
        support_set[0] = np.array(Image.open(pair_x[0]).resize((h, w)))[:,:,0:1] / 255

        j = 0
        for i in pair_x[1:]:
          test_image[j] = np.array(Image.open(i).resize((h, w)))[:,:,0:1] / 255
          j += 1

        for i in range(1, N):
          pair_y = np.random.choice(self.file_set[false_idx[i-1]], 1, replace=False)[0]
          support_set[i] = np.array(Image.open(pair_y).resize((h, w)))[:,:,0:1] / 255
            
        # shuffle data
        index = list(range(N))
        np.random.shuffle(index)
        targets, test_image, support_set = targets[index], test_image[index], support_set[index]
        pairs = [test_image, support_set]

        return pairs, targets
    
    def test_oneshot(self, model, N, k, s="val", verbose=0):
        """Test average N way oneshot learning accuracy of a siamese neural net over k one-shot tasks"""
        n_correct = 0
        if verbose:
            print("\nEvaluating model on {} random {} way one-shot learning tasks ...".format(k,N))
        
        acc = []
        for i in range(k):
            inputs, targets = self.make_oneshot_task(N, s)
            probs = model.predict(inputs).reshape((N,))

            # check accuracy
            acc.append(np.argmax(probs)==np.argmax(targets))
            
        percent_correct = 100*sum(acc)/len(acc)

        if verbose:
            print("Got an average of {}% {} way one-shot learning accuracy".format(percent_correct,N))
        return percent_correct
    
    def train(self, model, epochs, verbosity):
        # train the model
        model.fit_generator(self.generate(batch_size))

def get_siamese_model(input_shape):
    """
    Model architecture based on the one provided in: http://www.cs.utoronto.ca/~gkoch/files/msc-thesis.pdf
    """

    left_input = Input(input_shape)
    right_input = Input(input_shape)

    model = Sequential()
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=input_shape, kernel_regularizer=l2(2e-4)))
    model.add(Conv2D(32, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    
    model.add(Conv2D(64, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(Conv2D(64, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    
    model.add(Conv2D(128, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(Conv2D(128, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    
    model.add(Conv2D(256, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(Conv2D(256, (3,3), activation='relu', kernel_regularizer=l2(2e-4)))
    model.add(MaxPooling2D())
    
    model.add(Flatten())
    model.add(Dense(512, activation='sigmoid', kernel_regularizer=l2(1e-3)))
    
    encoded_l = model(left_input)
    encoded_r = model(right_input)
    #print(tensors[0])
    L1_layer = Lambda(lambda tensors:K.abs(tensors[0] - tensors[1]))
    print(L1_layer)
    L1_distance = L1_layer([encoded_l, encoded_r])
    print(L1_distance)
    prediction = Dense(1,activation='sigmoid')(L1_distance)
    print(prediction)
    siamese_net = Model(inputs=[left_input,right_input],outputs=prediction)
    print(siamese_net)
    return siamese_net

def train_model(model, loader, weights_path="model.h5"):
    evaluate_every = 20     # interval for evaluating on one-shot tasks
    loss_every = 5          # interval for printing loss (iterations)
    batch_size = 32
    n_iter = 1000
    N_way = 5               # how many people for testing one-shot tasks?
    n_val = 50              # how many one-shot tasks to validate on?
    best = 0

    train_loss, train_acc = [], []
     
    print("Starting training process!")
    print("-------------------------------------")
    for i in range(1, n_iter):
        (inputs, targets) = loader.get_batch(batch_size)
        loss = model.train_on_batch(inputs, targets)
        train_loss.append(loss)
        #print("loss: ",loss)

        if i%loss_every == 0:
          print("iteration {}, training loss: {:.2f},".format(i,loss))

        if i%evaluate_every == 0:
          val_acc = loader.test_oneshot(model, N_way, n_val, verbose=True)
          if val_acc >= best:
              print("Current best: {0}, previous best: {1}\n".format(val_acc, best))
              print("Saving weights to: {0} \n".format(weights_path))
              print(weights_path)
              model.save_weights(weights_path)
              #model.save()
              best = val_acc

if __name__ == '__main__':
    print("jj")
    model = get_siamese_model((144, 224, 1))
    optimizer = Adam(lr = 0.00006)
    model.compile(loss="binary_crossentropy", optimizer=optimizer)

    loader = Siamese_Loader(path="path-to-data")
    train_model(model, loader, weights_path = "model_ecg.h5")

jj
<keras.layers.core.lambda_layer.Lambda object at 0x00000282AF54A350>
KerasTensor(type_spec=TensorSpec(shape=(None, 512), dtype=tf.float32, name=None), name='lambda_4/Abs:0', description="created by layer 'lambda_4'")
KerasTensor(type_spec=TensorSpec(shape=(None, 1), dtype=tf.float32, name=None), name='dense_9/Sigmoid:0', description="created by layer 'dense_9'")
<keras.engine.functional.Functional object at 0x00000282864518A0>
hh
[['C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_1.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_10.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_11.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_12.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_13.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\ecg-id-database-filter\\Person_01\\rec_14.png', 'C:\\Users\\S545241\\Desktop\\ecg_img\\e