In [None]:
import numpy as np
import glob
import os
from math import sqrt
import matplotlib.pyplot as plt
from sklearn import metrics
from keras.callbacks import ModelCheckpoint, Callback, EarlyStopping
import scipy.io as sio
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn import preprocessing
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Dropout
from keras.models import Model
from keras.layers import Input, Dense, concatenate,multiply, LayerNormalization, Add
from keras.layers import Lambda
from tensorflow.keras import layers
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

In [None]:

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim
        )
        self.sequence_length = sequence_length
        self.output_dim = output_dim

    def call(self, inputs):
        # The inputs are of shape: `(batch_size, frames, num_features)`
        length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_positions = self.position_embeddings(positions)
        return inputs + embedded_positions

    def compute_mask(self, inputs, mask=None):
        mask = tf.reduce_any(tf.cast(inputs, "bool"), axis=-1)
        return mask


In [None]:

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.5
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation=tf.nn.gelu), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]

        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


## Utility functions for training

In [None]:

def get_compiled_model():
    dense_dim = 1
    num_heads = 1
    classes = 14

    #Middle Input LSTM
    rgb_input_1 = Input(shape=(1024), name='input_rgb_1')
    rgb_output_1 = Dense(128, activation='relu', name='output_rgb_1')(rgb_input_1)

    #Middle Input LSTM
    rgb_input_2 = Input(shape=(1024), name='input_rgb_2')
    rgb_output_2 = Dense(128, activation='relu')(rgb_input_2)

    #Right Input LSTM
    rgb_input_3 = Input(shape=(1024), name='input_rgb_3')
    rgb_output_3 = Dense(128, activation='relu')(rgb_input_3)
    
    
    merged_rgb = concatenate([rgb_output_1, rgb_output_2, rgb_output_3], name='RGB_Concatenate')
    
    
    
    
    inputs_l = keras.Input(shape=(None, None))
    x1 = PositionalEmbedding(64, 58, name="frame_position_embedding1")(inputs_l)
    x1= TransformerEncoder(58, dense_dim, num_heads, name="transformer_layer1")(x1)
    x1 = layers.GlobalMaxPooling1D()(x1)
    x1 = layers.Dropout(0.4)(x1)
    x1 = layers.Dense(64)(x1)
    
    inputs_m = keras.Input(shape=(None, None))
    x2 = PositionalEmbedding(63, 29, name="frame_position_embedding2")(inputs_m)
    x2 = TransformerEncoder(29, dense_dim, num_heads, name="transformer_layer2")(x2)
    x2 = layers.GlobalMaxPooling1D()(x2)
    x2 = layers.Dropout(0.4)(x2)
    x2 = layers.Dense(64)(x2)
    
    
    inputs_r = keras.Input(shape=(None, None))
    x3 = PositionalEmbedding(64, 27, name="frame_position_embedding3")(inputs_r)
    x3 = TransformerEncoder(27, dense_dim, num_heads, name="transformer_layer3")(x3)
    x3 = layers.GlobalMaxPooling1D()(x3)
    x3 = layers.Dropout(0.4)(x3)
    x3 = layers.Dense(64)(x3)
    
    mer = concatenate([x1, x2, x3], name='Concatenate')
    mer = layers.Dropout(0.4)(mer)
    
    final_merge = concatenate([mer,merged_rgb])
    
    
    

#     x = layers.Dense(64, activation="relu")(x)
#     x = layers.Dense(128, activation="tanh")(x)
    outputs = layers.Dense(14, activation="sigmoid")(final_merge)
#     model = keras.Model(inputs, outputs)
    model = keras.Model(inputs=[inputs_l, inputs_m,inputs_r, rgb_input_1, rgb_input_2, rgb_input_3], outputs=outputs, name='Final_output')

    model.compile(optimizer="SGD", loss="binary_crossentropy", metrics=["accuracy"]
    )
    return model





In [None]:
model = get_compiled_model()
model.summary()

In [None]:
train_labels = np.array(pd.read_csv("train_samples_updated.csv"))[:, 5:]
val_labels = np.array(pd.read_csv("val_samples_updated.csv"))[:, 5:]
# labels.shape
rgb_1_train = np.load("path_to_final_swin_rgb_train_view1.npy") #update file path
rgb_0_train = np.load("path_to_final_swin_rgb_train_view0.npy")  #update file path
rgb_2_train = np.load("path_to_final_swin_rgb_train_view2.npy")  #update file path
print(rgb_1_train.shape, rgb_0_train.shape, rgb_2_train.shape)

rgb_1_val = np.load("path_to_final_swin_rgb_val_view1.npy")  #update file path
rgb_0_val = np.load("path_to_final_swin_rgb_val_view0.npy")  #update file path
rgb_2_val = np.load("path_to_final_swin_rgb_val_view2.npy")  #update file path
print(rgb_1_val.shape, rgb_0_val.shape, rgb_2_val.shape)

rgb_1_test = np.load("path_to_final_swin_rgb_test_view1.npy")  #update file path
rgb_0_test = np.load("path_to_final_swin_rgb_test_view0.npy")  #update file path
rgb_2_test = np.load("path_to_final_swin_rgb_test_view2.npy")  #update file path
print(rgb_1_test.shape, rgb_0_test.shape, rgb_2_test.shape)  #update file path

dct_1_train = np.load("path_to_swin_mul_dct_train_view1.npy")  #update file path
dct_0_train = np.load("swin_mul_dct_train_view0.npy")  #update file path
dct_2_train = np.load("swin_mul_dct_train_view2.npy")  #update file path
print(dct_1_train.shape, dct_0_train.shape, dct_2_train.shape)

dct_1_val = np.load("path_to_swin_mul_dct_val_view1.npy")  #update file path
dct_0_val = np.load("path_to_swin_mul_dct_val_view0.npy")  #update file path
dct_2_val = np.load("path_to_swin_mul_dct_val_view2.npy")  #update file path
print(dct_1_val.shape, dct_0_val.shape, dct_2_val.shape)

dct_1_test = np.load("path_to_swin_mul_dct_test_view1.npy")  #update file path
dct_0_test = np.load("path_to_swin_mul_dct_test_view0.npy")  #update file path
dct_2_test = np.load("path_to_swin_mul_dct_test_view2.npy")  #update file path
print(dct_1_test.shape, dct_0_test.shape, dct_2_test.shape)





train_C = np.load("path_to_train_C.npy")  #update file path
val_C = np.load("path_to_val_C.npy")  #update file path
test_C = np.load("path_to_test_C.npy")  #update file path
sc = StandardScaler()

s0, s1, s2 = train_C.shape[0], train_C.shape[1], train_C.shape[2]
# print(s0,s1,s2)
train_C = train_C.reshape(s0 *s1, s2)
sc.fit(train_C)

train_C = sc.transform(train_C)
train_C = train_C.reshape(s0, s1, s2)

s0, s1, s2 = val_C.shape[0], val_C.shape[1], val_C.shape[2]
val_C = val_C.reshape(s0 * s1, s2)
val_C = sc.transform(val_C)
val_C = val_C.reshape(s0, s1, s2)

s0, s1, s2 = test_C.shape[0], test_C.shape[1], test_C.shape[2]
test_C = test_C.reshape(s0 * s1, s2)
test_C = sc.transform(test_C)
test_C = test_C.reshape(s0, s1, s2)


train_T = np.load("path_to_train_T.npy")  #update file path
val_T = np.load("path_to_val_T.npy")  #update file path
test_T = np.load("path_to_test_T.npy")  #update file path
sc = StandardScaler()

s0, s1, s2 = train_T.shape[0], train_T.shape[1], train_T.shape[2]
train_T = train_T.reshape(s0 * s1, s2)
sc.fit(train_T)
train_T = sc.transform(train_T)
train_T = train_T.reshape(s0, s1, s2)

s0, s1, s2 = val_T.shape[0], val_T.shape[1], val_T.shape[2]
val_T = val_T.reshape(s0 * s1, s2)
val_T = sc.transform(val_T)
val_T = val_T.reshape(s0, s1, s2)

s0, s1, s2 =test_T.shape[0], test_T.shape[1], test_T.shape[2]
test_T = test_T.reshape(s0 * s1, s2)
test_T = sc.transform(test_T)
test_T = test_T.reshape(s0, s1, s2)



train_S = np.load("path_to_train_S.npy")  #update file path
val_S = np.load("path_to_val_S.npy")  #update file path
test_S = np.load("path_to_test_S.npy")  #update file path
sc = StandardScaler()

s0, s1, s2 = train_S.shape[0], train_S.shape[1], train_S.shape[2]
train_S = train_S.reshape(s0 * s1, s2)
sc.fit(train_S)
train_S = sc.transform(train_S)
train_S = train_S.reshape(s0, s1, s2)

s0, s1, s2 = val_S.shape[0], val_S.shape[1], val_S.shape[2]
val_S = val_S.reshape(s0 * s1, s2)
val_S = sc.transform(val_S)
val_S = val_S.reshape(s0, s1, s2)

s0, s1, s2 = test_S.shape[0], test_S.shape[1], test_S.shape[2]
test_S = test_S.reshape(s0 * s1, s2)
test_S = sc.transform(test_S)
test_S = test_S.reshape(s0, s1, s2)

print(train_C.shape, val_C.shape, test_C.shape, train_T.shape, val_T.shape, test_T.shape, train_S.shape, val_S.shape, test_S.shape)

In [None]:
epochs = 50
filepath = "/tmp/video_classifier"
history = model.fit([train_C, train_T, train_S, rgb_1_train, rgb_0_train, rgb_2_train], train_labels, batch_size = 64, epochs=50, validation_data=([val_C, val_T, val_S, rgb_1_val, rgb_0_val, rgb_2_val], val_labels))


In [None]:
predict = model.predict([val_C, val_T, val_S, rgb_1_val, rgb_0_val, rgb_2_val])

In [None]:
train_csv_file = pd.read_csv("path_val_samples_updated.csv") #Update path

col_names = ['rec_no', 'subject_pos', 'start_time', 'end_time' ]
new_train_csv = train_csv_file.drop(col_names, axis=1)

#preparing train and test csv
test_csv = new_train_csv
# train_csv = new_train_csv[3123:]
print(len(test_csv))

Column_names = ['Settle','Legs crossed','Groom','Hand-mouth','Fold arms','Leg movement','Scratch','Gesture','Hand-face','Adjusting clothing','Fumble','Shrug','Stretching','Smearing hands']
# Column_names = ['Hand-face','Hand-mouth','Gesture','Fumble','Scratch','Stretching','Smearing hands','Shrug','Adjusting clothing','Groom','Fold arms','Leg movement','Settle','Legs crossed']
extracted_col = test_csv["sample_id"]
test_pred_csv = pd.DataFrame(predict, columns = Column_names)
test_pred_csv.insert(0, "sample_id", extracted_col)
test_pred_csv.to_csv("test_predicted_transformer"  + ".csv", index=False)

In [None]:
print("For epochs: {0}**************************************************************".format(epochs))
import pandas as pd, numpy as np
from sklearn.metrics import average_precision_score


CLASSES = ['Hand-face','Hand-mouth','Gesture','Fumble','Scratch','Stretching','Smearing hands','Shrug','Adjusting clothing','Groom','Fold arms','Leg movement','Settle','Legs crossed']


def evaluate(test_annotation_file,user_submission_file):
#     test = pd.read_csv(test_annotation_file,index_col="sample_id").sort_values('sample_id')
#     user = pd.read_csv(user_submission_file,index_col="sample_id").sort_values('sample_id')
    
    test = test_annotation_file.sort_values('sample_id')
    
    user = user_submission_file.sort_values('sample_id')
    if not(np.all(test.index==user.index)):
        raise ValueError("Indexes of test and prediction files do not agree.")
        
    scores = []
    for behaviour in CLASSES:
        cur_score = average_precision_score(test[behaviour].values,user[behaviour].values)
        scores.append(cur_score)
    per_class_scores = pd.DataFrame({'behaviour':CLASSES,'score':scores}).set_index('behaviour')
    macro_average = np.mean(scores)
    return {'macro_average':macro_average,'per_class_scores':per_class_scores}



if __name__=='__main__':
    # example usage of evaluate function
    test_annotation_file = test_csv
    user_submission_file = test_pred_csv # use your own predictions here
    results = evaluate(test_annotation_file,user_submission_file)
    print('')
    print('--------------- MACRO AVERAGE: -----------------')
    print('')
    print(str(results['macro_average']))
    print('')
    print('--------------- PER CLASS: ---------------------')
    print(str(results['per_class_scores']))
