# Load and Import

In [None]:
import json
import os
import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader, Subset
from torch.nn.utils.rnn import pad_sequence

In [None]:
id_joints_dict = {0: 'nose',
                1: 'left_eye',
                2: 'right_eye',
                3: 'left_ear',
                4: 'right_ear',
                5: 'left_shoulder',
                6: 'right_shoulder',
                7: 'left_elbow',
                8: 'right_elbow',
                9: 'left_wrist',
                10: 'right_wrist',
                11: 'left_hip',
                12: 'right_hip',
                13: 'left_knee',
                14: 'right_knee',
                15: 'left_ankle',
                16: 'right_ankle'}
joints_id_dict = {v: k for k, v in id_joints_dict.items()}

# Read the Data

In [None]:
def get_df_from_preds(preds, instance_id=0):

        """
        Get a dataframe from a json file containing the poses. The dataframe contains the coordinates of the joints of the instance_id-th.

        Args:
            preds (str or list): Path of the json file containing the poses or list of poses.

        Returns:
            df (pd.DataFrame): A dataframe of poses.
        """

        if type(preds) == list:
            pose_sequence = preds
        else:
            with open(preds) as json_file:
                pose_sequence = json.load(json_file)
        
        dic_list = []

        for i in range(len(pose_sequence)):
            keypoints_dict = {}

            try:
                keypoints_list = pose_sequence[i]['instances'][instance_id]['keypoints']
            except:
                keypoints_list = [[np.nan, np.nan] for i in range(17)]

            for number, keypoint in enumerate(keypoints_list):
                keypoints_dict["X_" + id_joints_dict[number]] = keypoint[0]
                keypoints_dict["Y_" + id_joints_dict[number]] = keypoint[1]
            
            dic_list.append(keypoints_dict)
            
        df = pd.DataFrame.from_dict(dic_list)
        return df

In [None]:
df = get_df_from_preds('../../../outputs/tennis/backhand/predictions/p1_backhand_s1.json')
print('len(df):', len(df))
df.head()

# Model

In [None]:
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Initial hidden state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # Forward pass through RNN layer
        out, _ = self.rnn(x, h0)
        
        # Select the last time step's output
        out = out[:, -1, :]

        # Forward pass through fully connected layer
        out = self.fc(out)

        return out

In [None]:
import torch
import torch.nn as nn

class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # Initial hidden state and cell state
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # Forward pass through LSTM
        out, _ = self.lstm(x, (h0, c0))

        # Only take the output from the last time step
        out = self.fc(out[:, -1, :])

        return out


## Test d'une pass-forward

In [None]:
# Paramètres du modèle
input_size =  df.shape[1]  # Nombre de features en entrée du RNN
hidden_size = 128  # Taille de la couche cachée du RNN
num_layers = 2  # Nombre de couches RNN empilées
num_classes =  12

In [None]:
# Création de l'instance du modèle
model_test = RNNModel(input_size, hidden_size, num_layers, num_classes)

# Vous pouvez également spécifier un dispositif (GPU ou CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_test.to(device)

In [None]:
tensor = torch.from_numpy(df.values).float().to(device)
preds = model_test(tensor.unsqueeze(0))
softmax = nn.Softmax(dim=1)
softmax(preds)

In [None]:
criterion = CrossEntropyLoss()
loss = criterion(preds, torch.tensor([5]).to(device))
print('loss:', loss.item())

In [None]:
# Example usage:
input_size = df.shape[1]  # Dimension of input poses (X, Y for each joint)
hidden_size = 64  # Number of LSTM units
num_layers = 2  # Number of LSTM layers
num_classes = 6  # Number of output classes

model = LSTMClassifier(input_size, hidden_size, num_layers, num_classes)
model.to(device)
# You can then define your loss function and optimizer, and train the model using your data.

In [None]:
preds = model(tensor.unsqueeze(0))
softmax = nn.Softmax(dim=1)
softmax(preds)

# Création du dataset

In [None]:
class TennisDataset(Dataset):

    def __init__(self, root_dir, scaler=None, transform=None):
        self.root_dir = root_dir
        self.scaler = scaler
        self.transform = transform
        self.classes = os.listdir(root_dir)
        self.list_of_files = []

        self.label_encoder = LabelEncoder()
        self.label_encoder.fit_transform(self.classes)

        for classe in self.classes:
            for file in os.listdir(os.path.join(root_dir, classe, 'predictions')):
                self.list_of_files.append(os.path.join(root_dir, classe, 'predictions', file))
        self.list_of_files.sort()
    
    def __len__(self):
        number_of_files = 0
        for classe in self.classes:
            number_of_files += len(os.listdir(os.path.join(self.root_dir, classe, 'predictions')))
        return number_of_files
    
    def __getitem__(self, idx):
        
        file_name = self.list_of_files[idx]
        df = get_df_from_preds(file_name)
        label = file_name.split('/')[-3]
        data = torch.from_numpy(df.values).float()

        if self.scaler is not None:
            data = self.scaler.transform(data)

        if self.transform is not None:
            data = self.transform(data)
        
        return data, self.label_encoder.transform([label])[0]
    


In [None]:
dataset = TennisDataset('../../../outputs/tennis_no_vis_cluster')
print('Number of files in the dataset:', len(dataset))
print('Classes:', dataset.classes)
print('List of files:', dataset.list_of_files[0:10])

In [None]:
dataset.label_encoder.inverse_transform([dataset[1900][1]])

Split en test set et train set

In [None]:
all_indexes = list(range(len(dataset)))

num_test = int(len(dataset) * 0.2)
num_train = len(dataset) - num_test

train_indexes = list(np.random.choice(all_indexes, num_train, replace=False))
test_indexes = list(set(all_indexes) - set(train_indexes))

In [None]:
train_dataset = Subset(dataset, train_indexes)
test_dataset = Subset(dataset, test_indexes)
print('Number of files in the train dataset:', len(train_dataset))
print('Number of files in the test dataset:', len(test_dataset))

Train the scaler on the training data.

In [None]:
train_data = []

for i in range(len(train_dataset)):
    train_data.append(train_dataset[i][0])

In [None]:
scaler = MinMaxScaler()
scaler.fit(torch.cat(train_data).numpy())

In [None]:
print('Before scaling:', train_dataset[0][0][0])

In [None]:
dataset = TennisDataset('../../../outputs/tennis_no_vis_cluster', scaler=scaler, transform=lambda x: torch.from_numpy(x).float())
train_dataset = Subset(dataset, train_indexes)
test_dataset = Subset(dataset, test_indexes)

In [None]:
print('After scaling:', train_dataset[0][0][0])

# Training

In [None]:
input_size =  34
hidden_size = 64  # Taille de la couche cachée du RNN
num_layers = 2  # Nombre de couches RNN empilées
num_classes =  6

model = RNNModel(input_size, hidden_size, num_layers, num_classes)
model.to(device)
epochs = 10
batch_size = 32
criterion = CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
input_size =  34
hidden_size = 124 
num_layers = 1
num_classes =  6

model = LSTMClassifier(input_size, hidden_size, num_layers, num_classes)
model.to(device)
epochs = 10
batch_size = 32
criterion = CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=2, verbose=True)

Création des dataloaders

In [None]:
def collate_fn(batch):
    # batch est une liste de tuples (data, label)
    data, labels = zip(*batch)
    
    # Inverser chaque séquence
    reversed_data = [torch.flip(seq, [0]) for seq in data]
    
    # Remplir les séquences inversées pour qu'elles aient la même longueur
    padded_data = pad_sequence(reversed_data, batch_first=True, padding_value=0)
    
    # Inverser à nouveau chaque séquence du résultat pour avoir le padding au début
    data = [torch.flip(seq, [0]) for seq in padded_data]
    data = torch.stack(data)
    
    labels = torch.tensor(labels)
    return data, labels


train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

In [None]:
next(iter(train_dataloader))[0][0]

In [None]:
for epoch in range(epochs):
    model.train()
    for batch, (sequences, targets) in enumerate(train_dataloader):
        sequences = sequences.to(device)
        targets = targets.to(device)

        # Forward pass
        outputs = model(sequences)
        loss = criterion(outputs, targets)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # scheduler.step(loss)
        
        if batch % 10 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                .format(epoch, epochs, batch, len(train_dataloader), loss.item()))
        
    # Test the model
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for data, target in test_dataloader:
            data = data.to(device)
            target = target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

        print('Test Accuracy of the model on the {} test videos: {} %'.format(len(test_dataset), 100 * correct / total))

# Evaluation

In [None]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for data, target in test_dataloader:
        data = data.to(device)
        target = target.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

    print('Test Accuracy of the model on the {} test videos: {} %'.format(len(test_dataset), 100 * correct / total))