<a href="https://colab.research.google.com/github/tkarab/diplomaThesis/blob/master/training_procedure.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# @title Requirements
import numpy as np
from keras import utils
import tensorflow as tf
import keras
import random
from keras import layers
from keras.layers import Lambda
from keras import callbacks as cb


In [None]:
# @title Custom models

def improvedAtzoriNet(input_shape=(12,40,1),name='ImprovedAtzoriNet'):
  model = keras.Sequential(
      name=name,
      layers=[
            keras.Input(shape=input_shape),
            layers.Conv2D(filters=32, kernel_size=(1,input_shape[1]), padding='same', activation='relu', name='conv1'),
            layers.Dropout(0.15),
            layers.MaxPooling2D(pool_size=(1,3)),

            layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', name='conv2'),
            layers.Dropout(0.15),
            layers.MaxPooling2D(pool_size=(3,3)),

            layers.Conv2D(filters=64, kernel_size=(5,5), padding='same', activation='relu', name='conv3'),
            layers.Dropout(0.15),
            layers.MaxPooling2D(pool_size=(3,3)),

            layers.Conv2D(filters=64, kernel_size=(5,1), padding='same', activation='relu', name='conv4'),
            layers.Flatten(name='flatten'),
            layers.Dropout(0.15),
        ])
  return model

In [None]:
# @title Prototypical functions

def produce_prototype(x):
    return tf.reduce_mean(x,axis=1)

def reshape_query(x):
    return tf.reshape(x,[-1,tf.shape(x)[-1]])

def keep_first_tensor(x):
    return tf.expand_dims(x[0],axis=0)

def get_sum_of_squares(x):
    return tf.reduce_sum(x**2,axis=1,keepdims=True)

def euc_dist(args):
    prototypes, query_feat = args

    return tf.sqrt(tf.reduce_sum(tf.square(prototypes-query_feat), axis=1))

def l2_dist(args):
    prototypes, query_feat_reshaped = args
    p_exp = tf.expand_dims(prototypes,axis=0)
    q_exp = tf.expand_dims(query_feat_reshaped, axis=1)
    diff = q_exp - p_exp
    return tf.sqrt(tf.reduce_sum(tf.square(diff), axis=-1))

def softmax_classification(args):
    pred = tf.nn.softmax(-euc_dist(args))
    print()
    print_tensor(pred)
    return pred

def get_fsl_set_rand(N,k,dim1=28,dim2=28):
    x = tf.random.uniform(shape=(N, k, dim1, dim2, 1), minval=0, maxval=1)
    return x

def get_fsl_set(x,N,k):
    return tf.stack([tf.concat([i*x+j for j in range(k)],axis=0) for i in range(1,N+1)],axis=0)


def print_tensor(tensor, precision=4):
    tf.print(tf.strings.as_string(tensor,precision=precision))

In [None]:
# @title class TaskGenerator
class TaskGenerator(utils.Sequence):
    def __init__(self, experiment:str, way:int, shot:int, mode:str,channels:int=12, window_size:int=40, batches: int = 1000):
        self.experiment = experiment
        self.way = way
        self.shot = shot
        self.mode = mode
        self.channels = channels
        self.window_size = window_size
        self.getData()
        self.batches = batches

        if self.experiment == '1':
            self.s_domain = list(range(1,41))
            self.g_domain = list(range(1,50))
            self.r_domain = {'train':[1,3,4,6], 'test':[2,5]}[self.mode]
            self.task_generator = self.generate_task_1
            self.s_r_pairs = self.get_s_r_pairs()

        elif self.experiment in ['2a', '2b']:
            self.s_domain = {'train': list(range(1,28)), 'val': list(range(28,33)),'test': list(range(33,41))}[self.mode]
            self.g_domain = list(range(1,50))
            self.r_domain = list(range(1,7))

            if experiment == '2a':
                self.task_generator = self.generate_task_2a
            else:
                self.task_generator = self.generate_task_1
                self.s_r_pairs = self.get_s_r_pairs()

        elif self.experiment == '3':
            self.s_domain = list(range(1, 41))
            self.g_domain = {'train': list(range(1,35)), 'val': list(range(35,41)), 'test': list(range(41,50))}[self.mode]
            self.r_domain = list(range(1, 7))
            self.task_generator = self.generate_task_1
            self.s_r_pairs = self.get_s_r_pairs()

        return

    def getData(self):
        path = r'/content/drive/MyDrive/Data/Preprocessed/DB2/Preprocessing 2/data_split/db2_ex{}_{}.npz'.format(self.experiment[0],self.mode)
        self.data = np.load(path)

    def getKeys(self,*entries:tuple) -> list:
        return ['s{}g{}r{}'.format(s,g,r) for s,r,g in entries]

    def getKeys_in_order(self,*entries:tuple) -> list:
        return ['s{}g{}r{}'.format(s,g,r) for s,g,r in entries]

    def get_s_r_pairs(self) -> list:
        return [(s,r) for s in self.s_domain for r in self.r_domain]

    def __getitem__(self, item):
        suport, query, label = self.task_generator() #activates either generate_task_1() or generate_task_2a() depending on the experiment
        print(label)
        return [suport, query] , label

    def __len__(self):
        return self.batches

    def generate_task_1(self):
        key_list = []
        support_set = np.empty((self.way, self.shot, self.channels, self.window_size, 1))
        query_image = np.empty((self.way, self.channels, self.window_size, 1))

        task_gestures = random.sample(self.g_domain, self.way)
        query_gesture_index, chosen_query_gest = random.choice(list(enumerate(task_gestures)))
        shot_list = [self.shot]*self.way
        shot_list[query_gesture_index] += 1

        support_pairs = [random.sample(self.s_r_pairs, shot_number) for shot_number in shot_list]
        for i,g in enumerate(task_gestures):
            sgr_list = [pair + (g,) for pair in support_pairs[i]]
            key_list.append(self.getKeys(*sgr_list))

        query_key = key_list[query_gesture_index].pop()

        for i,gest_key_list in enumerate(key_list):
            support_set[i] = np.array([random.choice(self.data[key]) for key in gest_key_list])

        query_image = np.array([random.choice(self.data[query_key])])#*np.ones(self.way)[:, np.newaxis, np.newaxis, np.newaxis]
        label = utils.to_categorical(query_gesture_index, num_classes=self.way)

        return support_set, query_image, label

    def generate_task_2a(self):
        key_list = []
        support_set = np.empty((self.way, self.shot, self.channels, self.window_size, 1))
        query_image = np.empty((1, self.channels, self.window_size, 1))

        task_gestures = random.sample(self.g_domain, self.way)
        query_gesture_index, chosen_query_gest = random.choice(list(enumerate(task_gestures)))
        chosen_subject = random.choice(self.s_domain)
        shot_list = [self.shot] * self.way
        shot_list[query_gesture_index] += 1

        reps = [random.sample(self.r_domain, shot_number) for shot_number in shot_list]
        for i,g in enumerate(task_gestures):
            sgr_list = [(chosen_subject,rep,g) for rep in reps[i]]
            key_list.append(self.getKeys(*sgr_list))

        query_key = key_list[query_gesture_index].pop()

        for i, gest_key_list in enumerate(key_list):
            support_set[i] = np.array([random.choice(self.data[key]) for key in gest_key_list])

        query_image = np.array([random.choice(self.data[query_key])])*np.ones(self.way)[:, np.newaxis, np.newaxis, np.newaxis]
        label = utils.to_categorical(query_gesture_index, num_classes=self.way)

        # printKeys(key_list)

        return support_set, query_image, label


In [None]:
# @title Create model
inp_shape = (12,40,1)
cnn_backbone = improvedAtzoriNet(inp_shape)#simplest_conv_net_1_layer(input_shape = inp_shape, feature_vector_size=12)


model_timeDist = layers.TimeDistributed(cnn_backbone)

#input shape tuple
inp_shape_5d = (None,) + inp_shape

#Layers
support_set_inp_shape_layer = layers.Input(inp_shape_5d)
query_set_inp_shape_layer = layers.Input(inp_shape)
#query_set_keep_first_tensor_layer = Lambda(function=keep_first_tensor)(query_set_inp_shape_layer)

support_set_embeddings_layer = model_timeDist(support_set_inp_shape_layer)
#query_set_embedding_layer = cnn_backbone(query_set_keep_first_tensor_layer)   #= model_timeDist(query_set_inp_shape_layer)
query_set_embedding_layer = cnn_backbone(query_set_inp_shape_layer)   #= model_timeDist(query_set_inp_shape_layer)


prototypes_layer = Lambda(function=produce_prototype)(support_set_embeddings_layer)
query_embedding_reshaped_layer = Lambda(function=keep_first_tensor)(query_set_embedding_layer)

query_distances_layer = Lambda(function=euc_dist)([prototypes_layer, query_set_embedding_layer])

query_prediction_layer = Lambda(function=softmax_classification)([prototypes_layer, query_set_embedding_layer])


model = keras.Model(inputs=[support_set_inp_shape_layer,query_set_inp_shape_layer], outputs=query_prediction_layer)
model.compile(loss='categorical_crossentropy', optimizer=keras.optimizers.Adam(0.0001), metrics=['categorical_accuracy'], run_eagerly=True)






In [None]:
model_check = keras.Model(inputs=query_set_inp_shape_layer, outputs=query_set_embedding_layer)
train_loader = TaskGenerator(way=5, shot=3, experiment='1', mode='train', batches=10000)
[support, query], label = train_loader[0]

y = model_check.predict(query)


[0. 0. 1. 0. 0.]


In [None]:
# @title Train
ex = '1'
N = 5
k = 5
tensorboard = cb.TensorBoard()
train_loader = TaskGenerator(way=N, shot=k, experiment=ex, mode='train', batches=10000)
if ex != '1':
  val_loader = TaskGenerator(way=N, shot=k, experiment=ex, mode='val')
  model.fit(train_loader, validation_data = val_loader, epochs=25,   shuffle=False)
model.fit(train_loader, epochs=25,   shuffle=False)

[0. 0. 1. 0. 0.]
Epoch 1/25
[0. 0. 0. 0. 1.]

["0.2000" "0.2000" "0.2000" "0.2000" "0.2000"]
[0. 1. 0. 0. 0.]




    1/10000 [..............................] - ETA: 35:52:42 - loss: 1.6094 - categorical_accuracy: 1.0000
["0.2000" "0.2000" "0.2000" "0.2000" "0.2000"]
    2/10000 [..............................] - ETA: 18:53 - loss: 1.6094 - categorical_accuracy: 1.0000   [0. 0. 0. 0. 1.]

["0.2000" "0.2000" "0.2000" "0.2000" "0.2000"]
    3/10000 [..............................] - ETA: 4:24:06 - loss: 1.6094 - categorical_accuracy: 0.6667[0. 0. 0. 1. 0.]

["0.2000" "0.2000" "0.2000" "0.2000" "0.2000"]
    4/10000 [..............................] - ETA: 37:35:35 - loss: 1.6094 - categorical_accuracy: 0.5000[0. 0. 0. 1. 0.]

["0.2000" "0.2000" "0.1999" "0.2000" "0.2000"]
    5/10000 [..............................] - ETA: 28:59:53 - loss: 1.6094 - categorical_accuracy: 0.4000[1. 0. 0. 0. 0.]

["0.2000" "0.2000" "0.2000" "0.2000" "0.2001"]
    6/10000 [..............................] - ETA: 24:33:00 - loss: 1.6094 - categorical_accuracy: 0.3333[0. 1. 0. 0. 0.]

["0.2000" "0.2000" "0.2000" "0.2000" "

KeyboardInterrupt: 