In [None]:
!pip install tensorflow==2.15.0

In [None]:
''' Importing libraries '''

import numpy as np
import tensorflow as tf
import pandas as pd
import seaborn as sns
import random
import matplotlib.pyplot as plt
import json
import csv
from tensorflow import keras
from pylab import rcParams
from matplotlib import rc
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model, Sequential, load_model, clone_model
from keras.layers import Dense
from google.colab import files
from sklearn.preprocessing import RobustScaler
from sklearn import preprocessing
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from sklearn.utils import shuffle
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
from scipy import spatial

%matplotlib notebook

RANDOM_SEED = 42

np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):

    def __init__(self, position, d_model):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, position, i, d_model):
        angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
        return position * angles

    def positional_encoding(self, position, d_model):
        angle_rads = self.get_angles(
            position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
            i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
            d_model=d_model)
        # apply sin to even index in the array
        sines = tf.math.sin(angle_rads[:, 0::2])
        # apply cos to odd index in the array
        cosines = tf.math.cos(angle_rads[:, 1::2])

        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return tf.cast(pos_encoding, tf.float32)

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

In [None]:
def scaled_dot_product_attention(q, k, v, mask):
    """Calculate the attention weights.
    q, k, v must have matching leading dimensions.
    k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
    The mask has different shapes depending on its type(padding or look ahead)
    but it must be broadcastable for addition.

    Args:
      q: query shape == (..., seq_len_q, depth)
      k: key shape == (..., seq_len_k, depth)
      v: value shape == (..., seq_len_v, depth_v)
      mask: Float tensor with shape broadcastable
            to (..., seq_len_q, seq_len_k). Defaults to None.

    Returns:
      output, attention_weights
    """

    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)

    # scale matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # add the mask to the scaled tensor.
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    # softmax is normalized on the last axis (seq_len_k) so that the scores
    # add up to 1.
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)

    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)

    return output, attention_weights


class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(d_model, use_bias=False)
        self.wk = tf.keras.layers.Dense(d_model, use_bias=True)
        self.wv = tf.keras.layers.Dense(d_model, use_bias=True)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask, return_attention=False):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        if return_attention == False:
          q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
          k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
          v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        if return_attention:
          return scaled_attention, attention_weights

        scaled_attention = tf.transpose(scaled_attention,
                                        perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

In [None]:
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
        tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
    ])


class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask=None):

        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)

        return out2

In [None]:
class AttentionWithContext(tf.keras.layers.Layer):
    """
        Attention operation, with a context/query vector, for temporal data.
        Supports Masking.
        Follows the work of Yang et al. [https://www.cs.cmu.edu/~diyiy/docs/naacl16.pdf]
        "Hierarchical Attention Networks for Document Classification"
        by using a context vector to assist the attention
        # Input shape
            3D tensor with shape: `(samples, steps, features)`.l
        # Output shape
            2D tensor with shape: `(samples, features)`.
        :param kwargs:
        Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
        The dimensions are inferred based on the output shape of the RNN.
        Example:
            model.add(LSTM(64, return_sequences=True))
            model.add(AttentionWithContext())
        """

    def __init__(self,
                 W_regularizer=None, u_regularizer=None, b_regularizer=None,
                 W_constraint=None, u_constraint=None, b_constraint=None,
                 bias=True,
                 return_attention=False):
        super(AttentionWithContext, self).__init__()

        self.supports_masking = True
        self.return_attention = return_attention
        self.init = tf.keras.initializers.get('glorot_uniform')

        self.W_regularizer = tf.keras.regularizers.get(W_regularizer)
        self.u_regularizer = tf.keras.regularizers.get(u_regularizer)
        self.b_regularizer = tf.keras.regularizers.get(b_regularizer)

        self.W_constraint = tf.keras.constraints.get(W_constraint)
        self.u_constraint = tf.keras.constraints.get(u_constraint)
        self.b_constraint = tf.keras.constraints.get(b_constraint)

        self.bias = bias

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[-1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)

        self.u = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_u'.format(self.name),
                                 regularizer=self.u_regularizer,
                                 constraint=self.u_constraint)

    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None

    def call(self, x, mask=None):
        uit = tf.tensordot(x, self.W, axes=1)

        if self.bias:
            uit += self.b

        uit = tf.keras.activations.tanh(uit)
        ait = tf.tensordot(uit, self.u, axes=1)

        a = tf.math.exp(ait)

        # apply mask after the exp. will be re-normalized next
        if mask is not None:
            # Cast the mask to floatX to avoid float64 upcasting in theano
            a *= tf.cast(mask, tf.keras.backend.floatx())

        # in some cases especially in the early stages of training the sum may be almost zero
        # and this results in NaN's. A workaround is to add a very small positive number Îµ to the sum.
        # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
        a /= tf.cast(tf.keras.backend.sum(a, axis=1, keepdims=True) + tf.keras.backend.epsilon(),
                     tf.keras.backend.floatx())

        a = tf.keras.backend.expand_dims(a)
        weighted_input = x * a
        result = tf.keras.backend.sum(weighted_input, axis=1)

        if self.return_attention:
            return result, a
        return result

    def compute_output_shape(self, input_shape):
        if self.return_attention:
            return tf.TensorShape([input_shape[0].value, input_shape[-1].value],
                                  [input_shape[0].value, input_shape[1].value])
        else:
            return tf.TensorShape([input_shape[0].value, input_shape[-1].value])


In [None]:
def create_model(n_timesteps, n_features, n_outputs, _dff=512, d_model=128, nh=4, dropout_rate=0.2, use_pe=True):
    inputs = tf.keras.layers.Input(shape=(n_timesteps, n_features,))

    x = tf.keras.layers.Conv1D(d_model, 1, activation='relu')(inputs)

    if use_pe:
        x *= tf.math.sqrt(tf.cast(d_model, tf.float32))
        x = PositionalEncoding(n_timesteps, d_model)(x)
        x = tf.keras.layers.Dropout(rate=dropout_rate)(x)

    x = EncoderLayer(d_model=d_model, num_heads=nh, dff=_dff, rate=dropout_rate)(x)
    x = EncoderLayer(d_model=d_model, num_heads=nh, dff=_dff, rate=dropout_rate)(x)
    # x = tf.keras.layers.GlobalAveragePooling1D()(x)

    x = AttentionWithContext()(x)
    # x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(n_outputs * 4, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    # x = tf.keras.layers.Dense(128, activation='relu') (x)

    predictions = tf.keras.layers.Dense(n_outputs, activation='softmax')(x)
    model = tf.keras.Model(inputs=inputs, outputs=predictions)

    return model


In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/gesture_recognition/Research code/Transfer learning/AAAI submission

Mounted at /content/drive
/content/drive/.shortcut-targets-by-id/1Sza0LndmZIjGwAh8IAWI16qskTGATgZ9/gesture_recognition/Research code/Transfer learning/AAAI submission


In [None]:
''' Function to read csv_file '''

def read_data(csv_file, column_no, after_column_no):
  df = pd.read_csv(csv_file)
  if after_column_no==True:
    df = df.iloc[: , column_no:]
  else:
    df = df.iloc[: , :column_no]
  return df

In [None]:
'''Function for random subject-wise splitting'''

def leave_subject(curr_df,n=1):
  ids =[]
  user_ids= np.unique(curr_df['userId'])
  for i in range(n):
   id = random.choice(user_ids)
   ids.append(id)
   indices = np.where(user_ids==id)
   user_ids = np.delete(user_ids, indices)
  test= curr_df[curr_df['userId'].isin(ids)]
  train = curr_df[~curr_df['userId'].isin(ids)]
  return train,test,ids

In [None]:
''' Function to prepare data '''

import statistics

def create_dataset(X, y, time_steps=1, step=1):
    Xs, ys = [], []
    for i in range(0, len(X), step):
        v = X.iloc[i:(i + time_steps)].values
        labels = y.iloc[i: (i + time_steps)]
        Xs.append(v)
        ys.append(statistics.mode(labels))

    return np.array(Xs), np.array(ys).reshape(-1, 1)

In [None]:
''' Function to scale data '''

def scaling_dataframe(df_train, df_val, scale_columns):
  scaler = RobustScaler()
  scaler = scaler.fit(df_train[scale_columns].values)

  df_train.loc[:, scale_columns] = scaler.transform(df_train[scale_columns].to_numpy())
  df_val.loc[:, scale_columns] = scaler.transform(df_val[scale_columns].to_numpy())

  return df_train, df_val

In [None]:
'''Custom label encoding function'''

def custom_label_encode(data, mapping):
    encoded_labels = [mapping[item[0]] for item in data]
    return encoded_labels

In [None]:
# Read control subject's data from csv
df = read_data('control-gesture-data-source.csv', 2, True)

In [None]:
train_ids = [1, 2, 3, 4, 5, 6, 7, 8]
gestures_source = ['right', 'left', 'square_C', 'square_AC', 'upRight', 'upLeft', 'rightDown', 'leftDown',
                   'v', 'v_Mirror', 'v_Reverse', 'v_ReverseM', 's_Top', 's_TopM', 's_Down', 's_DownM']

In [None]:
# df_source = pd.DataFrame(columns=['x_axis', 'y_axis', 'z_axis', 'gesture', 'userId'])
# df_source = pd.concat([df_source, df[df['userId'].isin(train_ids) & df['gesture'].isin(gestures_source)]])

df_source = pd.DataFrame(columns=df.columns).astype(df.dtypes.to_dict())
to_add = df[df['userId'].isin(train_ids) & df['gesture'].isin(gestures_source)]
df_source = pd.concat([df_source, to_add], ignore_index=True)

In [None]:
df_train, df_val, ids_val = leave_subject(df_source,n=1)

In [None]:
print(df_train.userId.unique())
print(df_val.userId.unique())

[1 2 3 5 6 7 8]
[4]


In [None]:
# Scale data

scale_columns = ['x_axis', 'y_axis', 'z_axis']
df_train, df_val = scaling_dataframe(df_train, df_val, scale_columns)

In [None]:
TIME_STEPS = 50 # Block of data to consider a gesture
STEP = 50 # Determines overlapping or not

X_train, y_train = create_dataset(
      df_train[['x_axis', 'y_axis', 'z_axis']],
      df_train.gesture,
      TIME_STEPS,
      STEP
  )

In [None]:
X_train.shape

(2240, 50, 3)

In [None]:
model = keras.models.load_model('pre-trained_Transformer.keras', custom_objects={"PositionalEncoding": PositionalEncoding,
                                                                                 "EncoderLayer": EncoderLayer,
                                                                                 "AttentionWithContext": AttentionWithContext})
intermediate_model = Model(inputs=model.input, outputs=model.layers[-3].output)
embeddings_control = intermediate_model(X_train)

In [None]:
print(embeddings_control)

In [None]:
test_ids = [101] # Impaired subject

order1_label_mapping = {'circle':0, 'double tap':1, 'rotate_f_s':2, 'rotate_s_f':3, 'shake':4, 'tap':5}

# order2_label_mapping = {'rotate_s_f':0, 'tap':1, 'rotate_f_s':2, 'shake':3, 'circle':4, 'double tap':5}

# order3_label_mapping = {'double tap':0, 'shake':1, 'rotate_f_s':2, 'circle':3, 'tap':4, 'rotate_s_f':5}

# order4_label_mapping = {'rotate_f_s':0, 'tap':1, 'circle':2, 'rotate_s_f':3, 'double tap':4, 'shake':5}

# order5_label_mapping = {'shake':0, 'double tap':1, 'rotate_s_f':2, 'tap':3, 'circle':4, 'rotate_f_s':5}

df = read_data('impaired-gesture-data-target.csv', 5, False)
df_target = pd.DataFrame(columns=df.columns).astype(df.dtypes.to_dict())
to_add = df[df['userId'].isin(test_ids) & df['gesture'].isin(['circle', 'double tap', 'rotate_f_s', 'rotate_s_f', 'shake', 'tap'])]
df_target = pd.concat([df_target, to_add], ignore_index=True)

TIME_STEPS = 50
STEP = 50

X_target, y_target = create_dataset(
      df_target[['x_axis', 'y_axis', 'z_axis']],
      df_target.gesture,
      TIME_STEPS,
      STEP
  )

y_target = custom_label_encode(y_target, order1_label_mapping)

#### Take first 5 samples as train and last 3 samples as test for each gesture ####
labels = np.unique(y_target)
X_train_target, y_train_target = [], []
X_test_target, y_test_target = [], []

for label in labels:
  c_tr = 0
  for ind in range(len(y_target)):
    if y_target[ind]==label:
      if c_tr<5:
        c_tr+=1
        X_train_target.append(X_target[ind])
        y_train_target.append(y_target[ind])
      else:
        X_test_target.append(X_target[ind])
        y_test_target.append(y_target[ind])

###################################################################################

X_train_target = np.array(X_train_target)
y_train_target = np.array(y_train_target)
X_test_target = np.array(X_test_target)
y_test_target = np.array(y_test_target)

In [None]:
print(X_train_target.shape)
print(y_train_target.shape)

(30, 50, 3)
(30,)


In [None]:
''' Function to add gestures one by one for few shot learning '''

def add_gestures(X_local, y_local, number_of_gestures):

  X_n, y_n = [], []

  for i in range(len(y_local)):
    if y_local[i] in np.unique(y_local)[:number_of_gestures]:
      X_n.append(X_local[i])
      y_n.append(y_local[i])

  return X_n, y_n

In [None]:
# Cross-attention weights heat map
# def plot_attention_weights(attention_weights, iter, epoch, num_class):
#   plt.figure(figsize=(16, 12))
#   plt.rcParams['font.size'] = 22
#   ax = plt.axes()
#   sns.heatmap(attention_weights, cmap='viridis')
#   plt.xlabel('Source examples')
#   plt.ylabel('Target examples')
#   # plt.title('Cross-relation Heatmap')
#   plt.setp(ax.get_xticklabels(), rotation=90)
#   plt.show()
#   plt.savefig('Attention-weights-heatmap-class-'+str(num_class)+'-iter-'+str(iter)+'-epoch-'+str(epoch)+'.png')
#   plt.close()

In [None]:
# Attention scores heat map
# def plot_attention_scores(attention_scores, iter, epoch, num_class):
#   plt.figure(figsize=(16, 12))
#   plt.rcParams['font.size'] = 22
#   ax = plt.axes()
#   sns.heatmap(attention_scores, cmap='viridis')
#   plt.xlabel('Feature embeddings')
#   plt.ylabel('Training examples')
#   # plt.title('Self-example Heatmap')
#   # fig.autofmt_xdate()
#   plt.setp(ax.get_xticklabels(), rotation=90)
#   plt.show()
#   plt.savefig('Output-heatmap-class-'+str(num_class)+'-iter-'+str(iter)+'-epoch-'+str(epoch)+'.png')
#   plt.close()

In [None]:
def cosine_similarity_loss(embd_one, embd_two, maximize, iter, epoch, num_class, get_embeddings):
    if maximize == True:
      normalized_embd_one = (embd_one-np.min(embd_one))/(np.max(embd_one)-np.min(embd_one))
      mha = MultiHeadAttention(d_model=64, num_heads=4)
      attention_scores, attention_weights = mha(normalized_embd_one, embd_one, embd_two, mask=None, return_attention=True)
      if get_embeddings:
        return attention_scores, attention_weights
      # plot_attention_scores(attention_scores, iter, epoch, num_class)
      # plot_attention_weights(attention_weights, iter, epoch, num_class)
      attention_scores = np.array(attention_scores)
      attention_scores = attention_scores[attention_scores > 0]
      normalized_scores = (attention_scores-np.min(attention_scores))/(np.max(attention_scores)-np.min(attention_scores))
      loss = 1.0 - np.mean(normalized_scores)
    else:
      embd_one_flat = np.hstack(embd_one)
      embd_two_flat = np.hstack(embd_two)
      cos_sim = 1.0 - spatial.distance.cosine(embd_one_flat, embd_two_flat)
      loss = cos_sim

    return loss

In [None]:
def classification_loss(y_true, y_pred):
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
    # print("Classification Loss: {}".format(loss_object(y_true, y_pred)))

    return loss_object(y_true, y_pred)

In [None]:
def overall_loss(model, inputs, targets, embdc, embdif, embdi, it, ep, num_class, get_embeddings):
  alpha = 0.25    # 0.25
  beta = 0.25     # 0.25
  gamma = 0.5     # 0.5
  loss_con_imp = cosine_similarity_loss(embdc, embdi, True, it, ep, num_class, get_embeddings)
  loss_imp_imp = cosine_similarity_loss(embdif, embdi, False, it, ep, num_class, get_embeddings)
  loss_cls = classification_loss(targets, model(inputs, training=True))
  total_loss = (alpha*loss_con_imp) + (beta*loss_imp_imp) + (gamma*loss_cls)

  return total_loss

In [None]:
def grad(model, inputs, targets, embdc, embdif, embdi, it, ep, num_class, get_embeddings):
  with tf.GradientTape() as tape:
    loss_value = overall_loss(model, inputs, targets, embdc, embdif, embdi, it, ep, num_class, get_embeddings)
  return tape.gradient(loss_value, model.trainable_variables)

In [None]:
optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=0.001)

In [None]:
csvfile_prf = open('avg_metric_score_gesture.csv', 'w', encoding='utf-8')
csvfile_writer_prf = csv.writer(csvfile_prf)
csvfile_writer_prf.writerow(["number of samples", "gesture", "avg_precision", "avg_recall", "avg_f1_score"])

csvfile_acc = open('avg_acc_gesture.csv', 'w', encoding='utf-8')
csvfile_writer_acc = csv.writer(csvfile_acc)
csvfile_writer_acc.writerow(["number of samples", "number of gestures", "avg_accuracy"])

cm = defaultdict(list)

number_of_gestures = 6 # Original - 6 gestures
start = 2 # Initilialize with two gestures

for cur in range(start, number_of_gestures+1): # Iterate over the gestures. Starts from 0,1.

  copy_model = keras.models.load_model('pre-trained_Transformer.keras', custom_objects={"PositionalEncoding": PositionalEncoding,
                                                                                 "EncoderLayer": EncoderLayer,
                                                                                 "AttentionWithContext": AttentionWithContext})
  intermediate_copy_model = Model(inputs=copy_model.input, outputs=copy_model.layers[-3].output)

  gesture_labels = [i for i in range(cur)] # gesture labels in order

  cls_ges_pre = defaultdict(list)
  cls_ges_rec = defaultdict(list)
  cls_ges_f1 = defaultdict(list)

  count = 1

  X_train_n, y_train_n = add_gestures(X_train_target, y_train_target, cur)
  X_test_n, y_test_n = add_gestures(X_test_target, y_test_target, cur)

  target_names = list(order1_label_mapping.keys())[:cur]

  while (count): # Adding 1 example, then 3 examples and then 5 examples.
    X_train_f = []
    y_train_f = []
    c={}
    for i in gesture_labels:
      c[i]=0
    for l in range(len(y_train_n)):
      if(c[y_train_n[l]]!=count):
        X_train_f.append(X_train_n[l])
        y_train_f.append(y_train_n[l])
        c[y_train_n[l]]+=1

    X_train_n_arr = np.array(X_train_f) # Converting list to array (train set)
    y_train_n_arr = np.array(y_train_f)

    X_test_n_arr = np.array(X_test_n) # Converting list to array (test set)
    y_test_n_arr = np.array(y_test_n)

    scaler = RobustScaler()
    X_train_n_arr = scaler.fit_transform(X_train_n_arr.reshape(-1, X_train_n_arr.shape[-1])).reshape(X_train_n_arr.shape)
    X_test_n_arr = scaler.transform(X_test_n_arr.reshape(-1, X_test_n_arr.shape[-1])).reshape(X_test_n_arr.shape)

    # Fixed embeddings representation for impaired samples
    embeddings_impaired_fixed = intermediate_copy_model(X_train_n_arr)

    results_fixed_embedding = {'embedding-fixed': np.array(embeddings_impaired).tolist()}
    file_name = f"embedding_fixed-{cur}-classes-{count}-examples.josn"
    with open(file_name, "w") as f:
        json.dump(results_fixed_embedding, f)


    num_epochs = 15

  #######################################################
    ''' Fine-tuning with KARL '''
  #######################################################

    cls_ges_sum_pre = defaultdict(list)
    cls_ges_sum_rec = defaultdict(list)
    cls_ges_sum_f1 = defaultdict(list)
    cls_acc = []

    output_file = "all_embeddings-"+str(cur)+"-classes-"+str(count)+"-examples"+".json"
    results = {}
    for iter in range(10):
      train_model = create_model(50, 3, 16, d_model=128, nh=4, dropout_rate=0.2)
      train_model.load_weights('pre-trained_Transformer.keras')
      layers = train_model.layers
      new_output = layers[-2].output
      new_layer = Dense(len(gesture_labels), activation='softmax')(new_output)
      train_model = Model(inputs=train_model.input, outputs=new_layer)
      for layer in train_model.layers[:-3]:
        layer.trainable = False
      # train_model.summary()

      iteration_key = f"iteration_{iter+1}"
      results[iteration_key] = {}

      for epoch in range(num_epochs):
        intermediate_train_model = Model(inputs=train_model.input, outputs=train_model.layers[-3].output)
        embeddings_impaired = intermediate_train_model(X_train_n_arr)

        # Storing attention scores and weights in json during training
        attention_scores, attention_weights = cosine_similarity_loss(embeddings_control, embeddings_impaired, True, iter, epoch, cur, True)
        results[iteration_key][f"epoch_{epoch+1}"] = {
            "attention_scores": np.array(attention_scores).tolist(),
            "attention_weights": np.array(attention_weights).tolist()
        }

        grads = grad(train_model, X_train_n_arr, y_train_n_arr, embeddings_control, embeddings_impaired_fixed, embeddings_impaired, iter, epoch, cur, False)
        optimizer.apply_gradients(zip(grads, train_model.trainable_variables))

      y_pred = np.argmax(train_model.predict(X_test_n_arr), axis=-1)
      cm[cur].append(confusion_matrix(y_test_n_arr, y_pred))

      report = classification_report(y_test_n_arr, y_pred, target_names=target_names)
      # Store all ierations' precision, recall and F1-score for each gesture from the classification report
      for item in target_names:
        precision = float(report.split(item)[1].split()[0])
        recall = float(report.split(item)[1].split()[1])
        f1_score = float(report.split(item)[1].split()[2])

        cls_ges_sum_pre[item].append(precision)
        cls_ges_sum_rec[item].append(recall)
        cls_ges_sum_f1[item].append(f1_score)

      cls_acc.append(float(report.split('accuracy')[1].split()[0]))

    with open(output_file, "w") as f:
      json.dump(results, f, indent=2)

    # Store the average precision, recall and F1-score for each gesture
    for n in target_names:
      cls_ges_pre[n].append(np.mean(cls_ges_sum_pre[n]))
      cls_ges_rec[n].append(np.mean(cls_ges_sum_rec[n]))
      cls_ges_f1[n].append(np.mean(cls_ges_sum_f1[n]))

      avg_pre = np.mean(cls_ges_sum_pre[n])
      avg_rec = np.mean(cls_ges_sum_rec[n])
      avg_f1 = np.mean(cls_ges_sum_f1[n])

      csv_line_prf = [count, n, avg_pre, avg_rec, avg_f1]
      csvfile_writer_prf.writerow(csv_line_prf)

    csv_line_acc = [count, len(gesture_labels), np.mean(cls_acc)]
    csvfile_writer_acc.writerow(csv_line_acc)

    count+=2
    if(count>5):
      break

csvfile_prf.close()
csvfile_acc.close()

# attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)

In [None]:
try:
  for key, value in cm.items():
    cm[key] = [np.array(arr).tolist() for arr in value]
  with open('cm_file_order5_karl_114.json', 'w') as f:
    json.dump(cm, f)

except:
  print("Not able to save!!!")

# Representation analysis

In [None]:
with open("all_embeddings-3-classes-3-examples.json", "r") as f:
  results = json.load(f)

In [None]:
'''
  t-SNE implementation
'''

from sklearn.manifold import TSNE
from collections import Counter

number_of_classes = 3
number_of_examples = 3

attention_weights = results['iteration_1']['epoch_15']['attention_weights']
attention_scores = results['iteration_1']['epoch_15']['attention_scores']

gestures = ['circle', 'double tap', 'fast outward, slow inward']

k = 224 # 10% of source dataset
top_indices = [np.argsort(row)[-k:][::-1].tolist() for row in attention_weights]

source_embeddings_list = []
for index in top_indices:
  source_samples = X_train[index]
  source_labels = y_train[index]
  counts = Counter(source_labels.flatten())
  print(counts)
  embedding = intermediate_model(source_samples)
  source_embeddings_list.append(embedding)

# For each sample, get top 2 classes and their embeddings
all_sample_embeddings = []
all_sample_labels = []

for sample_idx in range(len(attention_weights)):
    # Get indices sorted by attention weights for this sample
    sorted_indices = np.argsort(attention_weights[sample_idx])[::-1] # length -> 2240

    # Get top k indices
    top_k_indices = sorted_indices[:k] # 224
    top_k_labels = y_train[top_k_indices].flatten() # 224

    # Count occurrences of each class
    class_counts = Counter(top_k_labels)

    # Get top 2 classes by count
    top_2_classes = [cls for cls, count in class_counts.most_common(2)]
    print(top_2_classes)
    # Get embeddings for top 2 classes
    for idx in top_k_indices:
        if y_train[idx] in top_2_classes:
            all_sample_embeddings.append(source_embeddings_list[sample_idx][np.where(top_k_indices == idx)[0][0]])
            all_sample_labels.append(y_train[idx].item())

all_sample_embeddings = np.vstack(all_sample_embeddings)
all_sample_labels = np.vstack(all_sample_labels)
all_sample_labels = [label[0] for label in all_sample_labels]

print(len(all_sample_embeddings))

# Apply t-SNE to source examples
tsne1 = TSNE(n_components=2, perplexity=40, n_iter=1000, random_state=42)
components_weights = tsne1.fit_transform(all_sample_embeddings)

plt.figure(figsize=(12, 8))
plt.rcParams['font.size'] = 18

colors = ['red', 'green', 'blue', 'cyan', 'magenta', 'lime', 'fuchsia', 'lawngreen', 'indigo', 'gold', 'deepskyblue', 'teal', 'crimson']
unique_classes = np.unique(all_sample_labels)

for class_id, class_name in enumerate(unique_classes):
    mask = []
    for idx, label in enumerate(all_sample_labels):
      mask.append(True) if label == class_name else mask.append(False)

    plt.scatter(components_weights[mask, 0], components_weights[mask, 1], s=50,
                  color=colors[class_id], label=f'{class_name}', alpha=0.7)

labels = []
for class_id in range(number_of_classes):
  labels.extend([class_id] * number_of_examples)
labels = np.array(labels)

# Apply t-SNE to target examples
tsne2 = TSNE(n_components=2, perplexity=2, n_iter=1000, random_state=42)
components_scores = tsne2.fit_transform(np.array(attention_scores))  # (9, 2)

colors = ['black', 'orange', 'brown']
markers=['s', 'p', '*']
for class_id, color in enumerate(colors):
    mask = labels == class_id
    print(mask)
    print(components_scores[mask, 0], components_scores[mask, 1])
    plt.scatter(components_scores[mask, 0], components_scores[mask, 1], s=300,
                color=color, marker=markers[class_id] ,label=f'{gestures[class_id]}', alpha=0.7)

plt.xlabel('t-SNE Component 1')
plt.ylabel('t-SNE Component 2')
plt.title('t-SNE plot')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('tSNE_of_Source_Target_Embeddings_final.png')
plt.show()

print("t-SNE visualization completed")