## Google Drive Connection

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

## Data preparation

In [None]:
from sklearn.model_selection import train_test_split
import torch
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from imblearn.under_sampling import ClusterCentroids
from imblearn.under_sampling import RandomUnderSampler
from imblearn.under_sampling import NearMiss
from imblearn.under_sampling import RepeatedEditedNearestNeighbours
from imblearn.under_sampling import AllKNN
from imblearn.under_sampling import NeighbourhoodCleaningRule
from imblearn.under_sampling import OneSidedSelection
from imblearn.under_sampling import TomekLinks

def create_all_labels(y):
    #Create a set to avoid redudant values
    all_labels = []
    for index, row in df.iterrows():
      if not row['label'] in all_labels:
        all_labels.append(row['label'])
    return all_labels


#Return 
def split_train_test_data(df, all_labels, df_test_noaug = False, test_split_size = 0.3, scale = False, pca_value = None, undersampling_method = 'OneSide', tsne = None):
    
    #Get X and Y from the dataframe
    X = df.drop("label", axis=1) # Create a data with all columns except labels
    y = df[["label"]] # Create the labels dataset

    if tsne != None:
      X_embedded = TSNE(n_components=tsne, init="pca", learning_rate="auto").fit_transform(X.to_numpy())
      for i in range(0,tsne):
        df["tsne_component_"+str(i)]=X_embedded[:,i]

    if undersampling_method == 'OneSide':
      us_method = OneSidedSelection(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'Tomek':
      us_method = TomekLinks(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'Random':
      us_method = RandomUnderSampler(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'NearMiss':
      us_method = NearMiss(version=1, sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'RepeatedEdited':
      us_method = RepeatedEditedNearestNeighbours(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'AllKNN':
      us_method = AllKNN(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    elif undersampling_method == 'NCleaning':
      us_method = NeighbourhoodCleaningRule(sampling_strategy='majority')
      X_us, y_us = us_method.fit_resample(X, y)
    if undersampling_method != None:
      df = df.iloc[us_method.sample_indices_.tolist(),:]
      y = df[["label"]]
    
    if df_test_noaug == True:
      test_original_indexes = []
      test_total_indexes = math.floor(df.shape[0] * 0.3)
      while len(test_original_indexes) < test_total_indexes:
        odd_idx = random.randrange(0, df.shape[0], 2)
        if not odd_idx in test_original_indexes:
          test_original_indexes.append(odd_idx)

      rest_indexes = [i for i in range(0, df.shape[0])]
      for i in test_original_indexes:
        rest_indexes.remove(i)
      df_test = df.iloc[test_original_indexes]
      df_train = df.iloc[rest_indexes]
    ss = StandardScaler()
    #Split the data, 
    if df_test_noaug == True:
      X_train = df_train.drop("label", axis=1)
      X_test = df_test.drop("label", axis=1)
    else:
      train_df, test_df = train_test_split(df, test_size=test_split_size, random_state=42)
      # Make an instance of the Model
      X_train = train_df.drop("label", axis=1)
      X_test = test_df.drop("label", axis=1)

    if scale == True:
      X_train = ss.fit_transform(X_train)
      X_test = ss.fit_transform(X_test)

    pca = None
    if pca_value != None:
      pca = PCA(pca_value)
      pca.fit(X_train)
      X_train = pca.transform(X_train)
      X_test = pca.transform(X_test)  
    
    if df_test_noaug == True:
      y_train = df_train[["label"]]
      y_test = df_test[["label"]]
    else:
      y_train = train_df[["label"]]
      y_test = test_df[["label"]]
    
    data_xtrain_numpy = X_train.to_numpy() if scale == False else X_train
    data_xtest_numpy = X_test.to_numpy() if scale == False else X_test
    data_ytrain_numpy = y_train.to_numpy()
    data_ytest_numpy = y_test.to_numpy()

    return data_xtrain_numpy, data_xtest_numpy, data_ytrain_numpy, data_ytest_numpy, X_train, X_test, y_train, y_test, pca
  
def generate_features_label_tensors(features, label, all_labels, dtype = torch.double):
    features_data_tensor = torch.zeros(1, len(features), dtype=dtype)
    for i, feature in enumerate(features):
        features_data_tensor[0][i] = feature
    label_data_tensor = torch.tensor([all_labels.index(label)], dtype=torch.long)
    return features_data_tensor, label_data_tensor

def generate_labels_encoding(labels, all_labels):
    labels_encoded = []
    for label in labels:
        labels_encoded.append(all_labels.index(label))
    return labels_encoded

#word from output util function
def words_from_output(output, all_labels):
    label_idx = torch.argmax(output).item()
    return all_labels[label_idx]

def get_label_index(label, all_labels):
    try:
      return all_labels.index(label)
    except ValueError:
      return None
      

## Transformers Classes

In [None]:
import copy
import torch

import torch.nn as nn
from typing import Optional

class SPOTERTransformerDecoderLayer(nn.TransformerDecoderLayer):
    """
    Edited TransformerDecoderLayer implementation omitting the redundant self-attention operation as opposed to the
    standard implementation.
    """

    def __init__(self, d_model, nhead, dim_feedforward, dropout, activation):
        super(SPOTERTransformerDecoderLayer, self).__init__(d_model, nhead, dim_feedforward, dropout, activation)

        del self.self_attn

    def forward(self, tgt: torch.Tensor, memory: torch.Tensor, tgt_mask: Optional[torch.Tensor] = None,
                memory_mask: Optional[torch.Tensor] = None, tgt_key_padding_mask: Optional[torch.Tensor] = None,
                memory_key_padding_mask: Optional[torch.Tensor] = None) -> torch.Tensor:

        tgt = tgt + self.dropout1(tgt)
        tgt = self.norm1(tgt)
        tgt2 = self.multihead_attn(tgt, memory, memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)

        return tgt

In [None]:
def _get_clones(mod, n):
    return nn.ModuleList([copy.deepcopy(mod) for _ in range(n)])

In [None]:
class SPOTER(nn.Module):
    """
    Implementation of the SPOTER (Sign POse-based TransformER) architecture for sign language recognition from sequence
    of skeletal data.
    """

    def __init__(self, num_classes, num_heads, hidden_dim=55):
        super().__init__()

        self.row_embed = nn.Parameter(torch.rand(50, hidden_dim))
        self.pos = nn.Parameter(torch.cat([self.row_embed[0].unsqueeze(0).repeat(1, 1, 1)], dim=-1).flatten(0, 1).unsqueeze(0))
        self.class_query = nn.Parameter(torch.rand(1, hidden_dim))
        self.transformer = nn.Transformer(hidden_dim, num_heads, 6, 6)
        self.linear_class = nn.Linear(hidden_dim, num_classes)

        # Deactivate the initial attention decoder mechanism
        custom_decoder_layer = SPOTERTransformerDecoderLayer(self.transformer.d_model, self.transformer.nhead, 2048,
                                                             0.1, "relu")
        self.transformer.decoder.layers = _get_clones(custom_decoder_layer, self.transformer.decoder.num_layers)

    def forward(self, inputs):
        h = torch.unsqueeze(inputs.flatten(start_dim=1), 1).float()
        h = self.transformer(self.pos + h, self.class_query.unsqueeze(0)).transpose(0, 1)
        res = self.linear_class(h)

        return res

In [None]:
import torch


class GaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [None]:
import ast
import torch

import pandas as pd
import torch.utils.data as torch_data
import numpy as np

from random import randrange
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import matplotlib.pyplot as plt


def create_all_labels(df):
    #Create a set to avoid redudant values
    all_labels = []
    for index, row in df.iterrows():
      if not row['label'] in all_labels:
        all_labels.append(row['label'])
    return all_labels

def generate_labels_encoding(labels, all_labels):
    labels_encoded = []
    for label in labels:
        labels_encoded.append(all_labels.index(label))
    return labels_encoded

def load_dataset(file_location: str):
    
    df = pd.read_excel(file_location, index_col=0)
    
    all_labels = create_all_labels(df)
    labels_encoded = generate_labels_encoding(df["label"].to_list(), all_labels)
    
    labels = [label + 1 for label in labels_encoded]
    data = []

    X = df.drop("label", axis = 1)
    num_features = X.shape[1]
    for row in range(0, X.shape[0]):
      print(ast.literal_eval(X.iloc[row].values[0]))
      current_row = np.empty(shape=(len(ast.literal_eval(X.iloc[row].values[0])), num_features))
      for column in range(len(X.iloc[row].values)):
        current_row[:,column] = ast.literal_eval(X.iloc[row].values[column])
      data.append(current_row)

    return data, labels


class CzechSLRDataset(torch_data.Dataset):
    """Advanced object representation of the HPOES dataset for loading hand joints landmarks utilizing the Torch's
    built-in Dataset properties"""

    data: [np.ndarray]
    labels: [np.ndarray]

    def __init__(self, dataset_filename: str, num_labels=5, transform=None, augmentations=False,
                 augmentations_prob=0.5, normalize=True):
        """
        Initiates the HPOESDataset with the pre-loaded data from the h5 file.

        :param dataset_filename: Path to the h5 file
        :param transform: Any data transformation to be applied (default: None)
        """

        loaded_data = load_dataset(dataset_filename)
        data, labels = loaded_data[0], loaded_data[1]
        #print(labels)

        self.data = data
        self.labels = labels
        self.targets = list(labels)
        self.num_labels = num_labels
        self.transform = transform

        self.augmentations = augmentations
        self.augmentations_prob = augmentations_prob
        self.normalize = normalize

    def __getitem__(self, idx):
        """
        Allocates, potentially transforms and returns the item at the desired index.

        :param idx: Index of the item
        :return: Tuple containing both the depth map and the label
        """

        depth_map = torch.from_numpy(np.copy(self.data[idx]))
        label = torch.Tensor([self.labels[idx] - 1])

        if self.transform:
            depth_map = self.transform(depth_map)

        return depth_map, label

    def __len__(self):
        return len(self.labels)


if __name__ == "__main__":
    pass

## Utils

In [None]:
import logging
import torch


def train_epoch(model, dataloader, criterion, optimizer, device, scheduler=None):

    pred_correct, pred_all = 0, 0
    running_loss = 0.0

    for i, data in enumerate(dataloader):
        inputs, labels = data
        inputs = inputs.squeeze(0).to(device)
        labels = labels.to(device, dtype=torch.long)

        optimizer.zero_grad()
        outputs = model(inputs).expand(1, -1, -1)

        loss = criterion(outputs[0], labels[0])
        loss.backward()
        optimizer.step()
        running_loss += loss

        # Statistics
        if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0][0]):
            pred_correct += 1
        pred_all += 1

    if scheduler:
        scheduler.step(running_loss.item() / len(dataloader))

    return running_loss, pred_correct, pred_all, (pred_correct / pred_all)


def evaluate(model, num_classes, dataloader, device, print_stats=False):

    pred_correct, pred_all = 0, 0
    stats = {i: [0, 0] for i in range(num_classes)}

    for i, data in enumerate(dataloader):
        inputs, labels = data
        inputs = inputs.squeeze(0).to(device)
        labels = labels.to(device, dtype=torch.long)

        outputs = model(inputs).expand(1, -1, -1)

        # Statistics
        if int(torch.argmax(torch.nn.functional.softmax(outputs, dim=2))) == int(labels[0][0]):
            stats[int(labels[0][0])][0] += 1
            pred_correct += 1

        stats[int(labels[0][0])][1] += 1
        pred_all += 1

    if print_stats:
        stats = {key: value[0] / value[1] for key, value in stats.items() if value[1] != 0}
        print("Label accuracies statistics:")
        print(str(stats) + "\n")
        logging.info("Label accuracies statistics:")
        logging.info(str(stats) + "\n")

    return pred_correct, pred_all, (pred_correct / pred_all)


def evaluate_top_k(model, dataloader, device, k=5):

    pred_correct, pred_all = 0, 0

    for i, data in enumerate(dataloader):
        inputs, labels = data
        inputs = inputs.squeeze(0).to(device)
        labels = labels.to(device, dtype=torch.long)

        outputs = model(inputs).expand(1, -1, -1)

        if int(labels[0][0]) in torch.topk(outputs, k).indices.tolist():
            pred_correct += 1

        pred_all += 1

    return pred_correct, pred_all, (pred_correct / pred_all)


In [None]:
import numpy as np

from collections import Counter
from torch.utils.data import Subset
from sklearn.model_selection import train_test_split


def __balance_val_split(dataset, val_split=0.):
    targets = np.array(dataset.targets)
    train_indices, val_indices = train_test_split(
        np.arange(targets.shape[0]),
        test_size=val_split,
        stratify=targets
    )

    train_dataset = Subset(dataset, indices=train_indices)
    val_dataset = Subset(dataset, indices=val_indices)

    return train_dataset, val_dataset


def __split_of_train_sequence(subset: Subset, train_split=1.0):
    if train_split == 1:
        return subset

    targets = np.array([subset.dataset.targets[i] for i in subset.indices])
    train_indices, _ = train_test_split(
        np.arange(targets.shape[0]),
        test_size=1 - train_split,
        stratify=targets
    )

    train_dataset = Subset(subset.dataset, indices=[subset.indices[i] for i in train_indices])

    return train_dataset


def __log_class_statistics(subset: Subset):
    train_classes = [subset.dataset.targets[i] for i in subset.indices]
    print(dict(Counter(train_classes)))


## Train

In [None]:
import os
import random
import torch

import numpy as np
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from torchvision import transforms
from torch.utils.data import DataLoader
from pathlib import Path
import time


def train():

    # MARK: TRAINING PREPARATION AND MODULES

    # Initialize all the random seeds
    random.seed(379)
    np.random.seed(379)
    os.environ["PYTHONHASHSEED"] = str(379)
    os.environ["CUDA_LAUNCH_BLOCKING"] = str(1)
    torch.manual_seed(379)
    torch.cuda.manual_seed(379)
    torch.cuda.manual_seed_all(379)
    torch.backends.cudnn.deterministic = True
    g = torch.Generator()
    g.manual_seed(379)

    # Set device to CUDA only if applicable
    device = torch.device("cpu")
    if torch.cuda.is_available():
        device = torch.device("cuda")

	  # Training set
    transform = transforms.Compose([GaussianNoise(0, 0.001)])
    train_set = CzechSLRDataset('/content/gdrive/My Drive/your_directory/LIBRAS Data/All_Vocab_Transformer_17102022.xlsx', transform=transform)
	
    
    # Construct the model
    num_classes = len(np.unique(np.array(train_set.labels)))
    hidden_dim = 22
    lr = 0.001 #original 0.001
    scheduler_factor = 0.1
    scheduler_patience = 5
    num_heads = 11
    slrt_model = SPOTER(num_classes=num_classes, num_heads=num_heads, hidden_dim=hidden_dim)
    slrt_model.train(True)
    slrt_model.to(device)

    # Construct the other modules
    cel_criterion = nn.CrossEntropyLoss()
    sgd_optimizer = optim.SGD(slrt_model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(sgd_optimizer, factor=scheduler_factor, patience=scheduler_patience, verbose=True)

    # Ensure that the path for checkpointing and for images both exist
    experiment_name = 'LIBRAS__' + str(time.time())
    Path("/content/gdrive/My Drive/your_directory/LIBRAS Data/Experiments/out-checkpoints/" + experiment_name + "/").mkdir(parents=True, exist_ok=True)
    Path("/content/gdrive/My Drive/your_directory/LIBRAS Data/Experiments/out-img/").mkdir(parents=True, exist_ok=True)

    # MARK: DATA

    train_set, eval_set = __balance_val_split(train_set, 0.3)
    train_loader = DataLoader(train_set, shuffle=True, generator=g)
    eval_loader = DataLoader(eval_set, shuffle=True, generator=g)

    # MARK: TRAINING
    train_acc, val_acc = 0, 0
    losses, train_accs, val_accs = [], [], []
    lr_progress = []
    top_train_acc, top_val_acc = 0, 0
    checkpoint_index = 0

    print("Starting " + experiment_name + "...\n\n")
    epochs = 150
    save_checkpoints = True
    log_freq = 1
    for epoch in range(epochs):
        train_loss, _, _, train_acc = train_epoch(slrt_model, train_loader, cel_criterion, sgd_optimizer, device, scheduler)
        losses.append(train_loss.item() / len(train_loader))
        train_accs.append(train_acc)

         # Save checkpoints if they are best in the current subset
        if save_checkpoints:
            if train_acc > top_train_acc:
                top_train_acc = train_acc
                torch.save(slrt_model.state_dict(), "/content/gdrive/My Drive/Doctorado/Programa Tesis/Avances Otoño 2022/out-checkpoints/" + experiment_name + "/checkpoint_t_" + str(checkpoint_index) + ".pth")

        if epoch % log_freq == 0:
            print("[" + str(epoch + 1) + "] TRAIN  loss: " + str(train_loss.item() / len(train_loader)) + " acc: " + str(train_acc))

            print("")

        # Reset the top accuracies on static subsets
        if epoch % 10 == 0:
            top_train_acc, top_val_acc = 0, 0
            checkpoint_index += 1

        lr_progress.append(sgd_optimizer.param_groups[0]["lr"])

    # MARK: TESTING

    print("\nTesting checkpointed models starting...\n")

    top_result, top_result_name = 0, ""
    top_k_result, top_k_result_name = 0, ""

    if eval_loader:
        for i in range(checkpoint_index + 1):
            for checkpoint_id in ["t"]:
                tested_model = SPOTER(num_classes=num_classes, num_heads=num_heads, hidden_dim=hidden_dim)
                tested_model.to(device)
                tested_model.load_state_dict(torch.load("/content/gdrive/My Drive/Doctorado/Programa Tesis/Avances Otoño 2022/out-checkpoints/" + experiment_name + "/checkpoint_" + checkpoint_id + "_" + str(i) + ".pth"))
                tested_model.train(False)
                _, _, eval_acc = evaluate(tested_model, num_classes, eval_loader, device, print_stats=True)
                pred_correct, pred_all, eval_top_acc = evaluate_top_k(tested_model, eval_loader, device)

                if eval_acc > top_result:
                    top_result = eval_acc
                    top_result_name = experiment_name + "/checkpoint_" + checkpoint_id + "_" + str(i)
                
                if eval_top_acc > top_k_result:
                    top_k_result = eval_top_acc
                    top_k_result_name = experiment_name + "/checkpoint_" + checkpoint_id + "_" + str(i)

                print("checkpoint_" + checkpoint_id + "_" + str(i) + "  ->  " + str(eval_acc))

        print("\nThe top result was recorded at " + str(top_result) + " testing accuracy. The best checkpoint is " + top_result_name + ".")
        logging.info("\nThe top result was recorded at " + str(top_result) + " testing accuracy. The best checkpoint is " + top_result_name + ".")

        print("\nThe top k result was recorded at " + str(top_k_result) + " testing accuracy. The best checkpoint is " + top_k_result_name + ".")
        logging.info("\nThe top result was recorded at " + str(top_k_result) + " testing accuracy. The best checkpoint is " + top_k_result_name + ".")


    # PLOT 0: Performance (loss, accuracies) chart plotting
    if True:
        fig, ax = plt.subplots()
        ax.plot(range(1, len(losses) + 1), losses, c="#D64436", label="Training loss")
        ax.plot(range(1, len(train_accs) + 1), train_accs, c="#00B09B", label="Training accuracy")

        ax.xaxis.set_major_locator(ticker.MaxNLocator(integer=True))

        ax.set(xlabel="Epoch", ylabel="Accuracy / Loss", title="")
        plt.legend()
        ax.grid()

        fig.savefig("/content/gdrive/My Drive/Doctorado/Programa Tesis/Avances Otoño 2022/out-img/" + experiment_name + "_loss.eps")

    # PLOT 1: Learning rate progress
    if True:
        fig1, ax1 = plt.subplots()
        ax1.plot(range(1, len(lr_progress) + 1), lr_progress, label="LR")
        ax1.set(xlabel="Epoch", ylabel="LR", title="")
        ax1.grid()

        fig1.savefig("/content/gdrive/My Drive/Doctorado/Programa Tesis/Avances Otoño 2022/out-img/" + experiment_name + "_lr.eps", format='eps')

    print("\nAny desired statistics have been plotted.\nThe experiment is finished.")
    logging.info("\nAny desired statistics have been plotted.\nThe experiment is finished.")
    df_graphs_data = pd.DataFrame.from_dict({
        'losses': losses, 
        'train_accs': train_accs, 
        'lr_progress': lr_progress
    })
    df_graphs_data.to_csv("/content/gdrive/My Drive/Doctorado/Programa Tesis/Avances Otoño 2022/out-img/" + experiment_name + '_data.csv')

if __name__ == '__main__':
    train()
