In [None]:
import pandas as pd
from string import digits
import numpy as np
import torch
import torch.nn as nn

In [None]:
def convert_keypoint_data_2D(path,window = 45,step = 5):
    df=pd.read_csv(path)
    pivoted_df = df.pivot_table(index=['video_name', 'frame_number'], columns='landmark', values=['x', 'y']).reset_index()
    df_label =  pd.get_dummies(pivoted_df['video_name'].str.rstrip(digits), sparse=True)
    X_list_final = []
    Y_list_final = []

    for videoname in  df['video_name'].unique():
        row_idx = pivoted_df['video_name']==videoname
        input_list = pivoted_df[row_idx].values.tolist()
        label_list =df_label[row_idx].values.tolist()
        X_list = []
        Y_list = []
        begin = 60
        while begin <= len(input_list)-window-1:
            framerange = range(begin, begin+window, 1)
            temp_x_list = []
            for idx in framerange:
                temp_x_list.append(input_list[idx][2:32])
            X_list.append(temp_x_list)
            Y_list.append(np.array(label_list[begin+window-1]))
            begin = begin + step
        X_list_final.extend(X_list)
        Y_list_final.extend(Y_list)

    x_input = np.array(X_list_final).astype('float')
    y_input = np.array(Y_list_final).astype('int')
    return x_input,y_input

In [None]:
def convert_keypoint_data_3D(path,window = 45,step = 5):
    df=pd.read_csv(path)
    pivoted_df = df.pivot_table(index=['video_name', 'frame_number'], columns='landmark', values=['x', 'y', 'z']).reset_index()
    df_label =  pd.get_dummies(pivoted_df['video_name'].str.rstrip(digits), sparse=True)
    X_list_final = []
    Y_list_final = []

    for videoname in  df['video_name'].unique():
        row_idx = pivoted_df['video_name']==videoname
        input_list = pivoted_df[row_idx].values.tolist()
        label_list =df_label[row_idx].values.tolist()
        X_list = []
        Y_list = []
        begin = 60
        while begin <= len(input_list)-window-1:
            framerange = range(begin, begin+window, 1)
            temp_x_list = []
            for idx in framerange:
                temp_x_list.append(input_list[idx][2:32])
            X_list.append(temp_x_list)
            Y_list.append(np.array(label_list[begin+window-1]))
            begin = begin + step
        X_list_final.extend(X_list)
        Y_list_final.extend(Y_list)

    x_input = np.array(X_list_final).astype('float')
    y_input = np.array(Y_list_final).astype('int')
    return x_input,y_input

In [None]:
def convert_keypoint_data_angle(angle_path,window = 45,step = 5):
    df=pd.read_csv(angle_path)
    df_label =  pd.get_dummies(df['video_name'].str.rstrip(digits), sparse=True)
    X_list_final = []
    Y_list_final = []

    for videoname in  df['video_name'].unique():
        row_idx = df['video_name']==videoname
        input_list = df[row_idx].values.tolist()
        label_list =df_label[row_idx].values.tolist()
        X_list = []
        Y_list = []
        begin = 60
        while begin <= len(input_list)-window-1:
            framerange = range(begin, begin+window, 1)
            temp_x_list = []
            for idx in framerange:
                temp_x_list.append(input_list[idx][2:12])
            X_list.append(temp_x_list)
            Y_list.append(np.array(label_list[begin+window-1]))
            begin = begin + step
        X_list_final.extend(X_list)
        Y_list_final.extend(Y_list)

    x_input = np.array(X_list_final).astype('float')
    y_input = np.array(Y_list_final).astype('int')
    return x_input,y_input

In [None]:
def convert_keypoint_data_2D_angle(path,angle_path,window = 45,step = 5):
    df=pd.read_csv(path)
    angle_df=pd.read_csv(angle_path)
    pivoted_df = df.pivot_table(index=['video_name', 'frame_number'], columns='landmark', values=['x', 'y']).reset_index()
    pivoted_df.columns = pivoted_df.columns.get_level_values(0)
    merge_df = pd.merge(pivoted_df, angle_df)

    df_label =  pd.get_dummies(pivoted_df['video_name'].str.rstrip(digits), sparse=True)
    X_list_final = []
    Y_list_final = []

    for videoname in  df['video_name'].unique():
        row_idx = merge_df['video_name']==videoname
        input_list = merge_df[row_idx].values.tolist()
        label_list =df_label[row_idx].values.tolist()
        X_list = []
        Y_list = []
        begin = 60
        while begin <= len(input_list)-window-1:
            framerange = range(begin, begin+window, 1)
            temp_x_list = []
            for idx in framerange:
                temp_x_list.append(input_list[idx][2:32])
            X_list.append(temp_x_list)
            Y_list.append(np.array(label_list[begin+window-1]))
            begin = begin + step
        X_list_final.extend(X_list)
        Y_list_final.extend(Y_list)

    x_input = np.array(X_list_final).astype('float')
    y_input = np.array(Y_list_final).astype('int')
    return x_input,y_input

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

In [None]:
import torch
import torch.nn as nn

class ResidualBlock_1(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(ResidualBlock_1, self).__init__()
        self.relu1 = nn.Sequential(nn.Linear(input_size, input_size),
                          nn.ReLU())
        self.gru1 = nn.GRU(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.relu2 = nn.Sequential(nn.Linear(hidden_size*2, input_size),
                                   nn.ReLU()
                                   )
        self.gru2 = nn.GRU(input_size, int(hidden_size), batch_first=True, bidirectional=True)
        self.relu3 = nn.Sequential(nn.Linear(hidden_size*2, input_size),
                            nn.ReLU()
                            )
        self.BN = nn.BatchNorm1d(45, affine=False)

    def forward(self, x):
        out = self.relu1(x)
        out, _ = self.gru1(out)
        out_1 = self.relu2(out)
        out, _ = self.gru2(out_1)
        out = self.relu3(out)
        out = torch.add(out, out_1)
        out = self.BN(out)
        return out

class ResidualBlock_2(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(ResidualBlock_2, self).__init__()
        self.gru1 = nn.GRU(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.relu2 = nn.Sequential(nn.Linear(hidden_size*2, input_size),
                                   nn.ReLU()
                                   )
        self.gru2 = nn.GRU(input_size, hidden_size, batch_first=True, bidirectional=True)
        self.relu3 = nn.Sequential(nn.Linear(hidden_size*2, input_size),
                                   nn.ReLU()
                                   )
        self.BN1 = nn.BatchNorm1d(45, affine=False)

    def forward(self, x):
        out, _ = self.gru1(x)
        out_1 = self.relu2(out)
        out,_ = self.gru2(out_1)
        out = self.relu3(out)
        out = torch.add(out, out_1)
        out = self.BN1(out)

        return out

class DeepResidualBidirGRU(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DeepResidualBidirGRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.res_block1 = ResidualBlock_1(input_size, hidden_size)
        self.res_block2 = ResidualBlock_2(input_size, hidden_size)
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x):
        out = self.res_block1(x)
        out = self.res_block2(out)
        out = self.linear(out)
        return out

    def compute_l2_loss(self, w):
          return torch.square(w).sum()

model = DeepResidualBidirGRU(input_size=30, hidden_size=128, output_size=3).to(device)
print(model)

In [None]:
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(GRUModel, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.gru = nn.GRU(input_size, hidden_size, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.gru(x)
        out = self.fc(out)
        return out

model = GRUModel(input_size=30, hidden_size=128, output_size=3).to(device)
print(model)

In [None]:
X_train,y_train = convert_keypoint_data_2D_angle(r'/content/output_data2_train.csv', r'/content/angle_data_train.csv')
X_test,y_test = convert_keypoint_data_2D_angle(r'/content/output_data2_test.csv', r'/content/angle_data_test.csv')

In [None]:
original_array = np.array(y_train)

# Define a function to convert the original array
def convert_array(original):
    return np.argmax(original, axis=1)

y_train_array = convert_array(original_array)
print(y_train_array)

In [None]:
class_sample_count = np.array(
    [len(np.where(y_train_array == t)[0]) for t in np.unique(y_train_array)])
weight = 1. / class_sample_count
sample_weight = np.array([weight[t-1] for t in y_train_array])

In [None]:
from torch.utils.data import WeightedRandomSampler
sampler = WeightedRandomSampler(weights=sample_weight,num_samples=len(y_train),replacement=True)

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Assuming X_train and y_train are your training input and output tensors, and batch_size is the desired batch size.
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
# Convert your data to TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)

# Create a DataLoader for your training dataset
train_loader = DataLoader(train_dataset, batch_size=16, sampler=sampler)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.00001, weight_decay=0.1)
num_epochs = 25

In [None]:
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    for inputs, labels in train_loader:  # Iterate over the training dataset
        optimizer.zero_grad()  # Zero the gradients
        outputs = model(inputs)  # Forward pass
        loss = criterion(outputs, labels)  # Calculate the loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update the weights
        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

print('Training Finished')

In [None]:
# Assuming X_train and y_train are your training input and output tensors, and batch_size is the desired batch size.
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)
# Convert your data to TensorDataset
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create a DataLoader for your training dataset
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True)

In [None]:
model.eval()
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        print('predict')
        print(predicted)
        print('actual')
        print(labels)
        correct += (predicted.argmax(1) == labels.argmax(1)).sum().item()

# Calculate accuracy and average loss
accuracy = 100 * correct / total
average_loss = test_loss / len(test_loader)

print(f'Accuracy on the test set: {accuracy:.2f}%')
print(f'Average loss on the test set: {average_loss:.4f}')

In [None]:
torch.save(model,"bidigru.pt")