In [1]:
import torch

print(f"CUDA available: {torch.cuda.is_available()}")
print(f"CUDA device: {torch.cuda.get_device_name(0)}")

CUDA available: True
CUDA device: NVIDIA GeForce RTX 3050 Ti Laptop GPU


In [2]:
import torch
import torch.nn as nn
import numpy as np

# x = torch.randn(1, 5, 2)
x = torch.from_numpy(np.array([[[1,2], [3,4], [5,6], [7, 8], [9, 10]]])).float()
print(x.shape)
print(x)

conv1d = nn.Conv1d(in_channels=5, out_channels=3, kernel_size=2)

output = conv1d(x)
print(output.shape)
print(output)

torch.Size([1, 5, 2])
tensor([[[ 1.,  2.],
         [ 3.,  4.],
         [ 5.,  6.],
         [ 7.,  8.],
         [ 9., 10.]]])
torch.Size([1, 3, 1])
tensor([[[4.9430],
         [3.5290],
         [5.6935]]], grad_fn=<ConvolutionBackward0>)


In [3]:
print(x)

tensor([[[ 1.,  2.],
         [ 3.,  4.],
         [ 5.,  6.],
         [ 7.,  8.],
         [ 9., 10.]]])


In [4]:
print(output)

tensor([[[4.9430],
         [3.5290],
         [5.6935]]], grad_fn=<ConvolutionBackward0>)


In [5]:
from scripts.parsers import parse_sequences as parse_sequence_info

file_path = 'gait3d\\ListOfSequences.txt'
sequences = parse_sequence_info(file_path)

mocap_keys = []
par_cam_keys = []
par_cam_person = set()
par_after_cloth_change_keys = []
par_after_cloth_change_person = set()

for key, params in sequences.items():
    if params['MoCap_data']:
        mocap_keys.append(key)
        if key[-1] in ["1", "3", "5", "7"]:
            par_cam_keys.append(key)
            par_cam_person.add(key[:-2])
        if key[-1] in ["5", "7"]:
            par_after_cloth_change_keys.append(key)
            par_after_cloth_change_person.add(key[:-2])

print(f"Number of sequences with mocap data: {len(mocap_keys)}")
print(f"Number of sequences with mocap data and parallel cameras: {len(par_cam_keys)}")
print(f"Number of sequences with mocap data, parallel cameras and after clothing change: {len(par_after_cloth_change_keys)}")
print(f"Number of unique participants with mocap data and parallel cameras: {len(par_cam_person)}")
print(f"Number of unique participants with mocap data, parallel cameras and after clothing change: {len(par_after_cloth_change_person)}")
par_after_cloth_change_person

Number of sequences with mocap data: 146
Number of sequences with mocap data and parallel cameras: 72
Number of sequences with mocap data, parallel cameras and after clothing change: 12
Number of unique participants with mocap data and parallel cameras: 30
Number of unique participants with mocap data, parallel cameras and after clothing change: 6


{'p26', 'p27', 'p28', 'p29', 'p30', 'p31'}

In [6]:
import random

without_clothing_change = []
while len(without_clothing_change) < 6:
    random_person = random.choice(list(par_cam_person))
    if random_person not in par_after_cloth_change_person:
        without_clothing_change.append(random_person)
        par_cam_person.remove(random_person)

with_clothing_change = []
while len(with_clothing_change) < 4:
    random_person = random.choice(list(par_after_cloth_change_person))
    with_clothing_change.append(random_person)
    par_after_cloth_change_person.remove(random_person)


test_seq_set = ([f'{p_seq}s{seq_idx}' for p_seq in without_clothing_change[:3] for seq_idx in [1, 3]] +
                [f'{p_seq}s{seq_idx}' for p_seq in with_clothing_change[:2] for seq_idx in [5, 7]])

valid_seq_set = ([f'{p_seq}s{seq_idx}' for p_seq in without_clothing_change[3:] for seq_idx in [1, 3]] +
                [f'{p_seq}s{seq_idx}' for p_seq in with_clothing_change[2:] for seq_idx in [5, 7]])

print(f"test sequences: {test_seq_set}")
print(f"valid sequences: {valid_seq_set}")
# without_clothing_change + [only_after_clothing_change] + [only_before_clothing_change]

test sequences: ['p14s1', 'p14s3', 'p12s1', 'p12s3', 'p17s1', 'p17s3', 'p29s5', 'p29s7', 'p28s5', 'p28s7']
valid sequences: ['p3s1', 'p3s3', 'p15s1', 'p15s3', 'p19s1', 'p19s3', 'p26s5', 'p26s7', 'p31s5', 'p31s7']


In [7]:
train_seq_set = ([f'{p_seq}s{seq_idx}' for p_seq in list(par_cam_person) for seq_idx in [1, 3]] +
                 [f'{p_seq}s{seq_idx}' for p_seq in list(par_after_cloth_change_person) for seq_idx in [5, 7]])

print(f"train sequences: {train_seq_set}")

train sequences: ['p22s1', 'p22s3', 'p8s1', 'p8s3', 'p5s1', 'p5s3', 'p21s1', 'p21s3', 'p26s1', 'p26s3', 'p9s1', 'p9s3', 'p10s1', 'p10s3', 'p11s1', 'p11s3', 'p2s1', 'p2s3', 'p28s1', 'p28s3', 'p32s1', 'p32s3', 'p23s1', 'p23s3', 'p24s1', 'p24s3', 'p25s1', 'p25s3', 'p13s1', 'p13s3', 'p29s1', 'p29s3', 'p16s1', 'p16s3', 'p31s1', 'p31s3', 'p4s1', 'p4s3', 'p7s1', 'p7s3', 'p20s1', 'p20s3', 'p1s1', 'p1s3', 'p30s1', 'p30s3', 'p27s1', 'p27s3', 'p30s5', 'p30s7', 'p27s5', 'p27s7']


In [8]:
for key, params in sequences.items():
    if params['MoCap_data']:
        if key[-1] in ["1", "3", "5", "7"]:
            print(f"{key} | {'train' if key in train_seq_set else '     '} | {'valid' if key in valid_seq_set else '     '} | {'test' if key in test_seq_set else '    '} |")

p1s1 | train |       |      |
p1s3 | train |       |      |
p2s1 | train |       |      |
p2s3 | train |       |      |
p3s1 |       | valid |      |
p3s3 |       | valid |      |
p4s1 | train |       |      |
p4s3 | train |       |      |
p5s1 | train |       |      |
p5s3 | train |       |      |
p7s1 | train |       |      |
p7s3 | train |       |      |
p8s1 | train |       |      |
p8s3 | train |       |      |
p9s1 | train |       |      |
p9s3 | train |       |      |
p10s1 | train |       |      |
p10s3 | train |       |      |
p11s1 | train |       |      |
p11s3 | train |       |      |
p12s1 |       |       | test |
p12s3 |       |       | test |
p13s1 | train |       |      |
p13s3 | train |       |      |
p14s1 |       |       | test |
p14s3 |       |       | test |
p15s1 |       | valid |      |
p15s3 |       | valid |      |
p16s1 | train |       |      |
p16s3 | train |       |      |
p17s1 |       |       | test |
p17s3 |       |       | test |
p19s1 |       | valid | 

In [9]:
print(f"Train size: {len(train_seq_set)} | {100*len(train_seq_set)/72:.2f}%")
print(f"Test size: {len(test_seq_set)} | {100*len(test_seq_set)/72:.2f}%")
print(f"Valid size: {len(valid_seq_set)} | {100*len(valid_seq_set)/72:.2f}%")

Train size: 52 | 72.22%
Test size: 10 | 13.89%
Valid size: 10 | 13.89%


In [10]:
import json

selected_names_file = "./datasets/mediapipe/selected_joint_names.json"
input_data_file = "./datasets/mediapipe/dataset.json"
output_data_file = "./datasets/mocap/dataset_v2.json"

with open(input_data_file, 'r') as file:
    raw_input = json.load(file)

with open(output_data_file, 'r') as file:
    raw_output = json.load(file)

with open(selected_names_file, 'r') as file:
    selected_names = json.load(file)

In [11]:
sequences['p1s1']

{'start_frame': 195,
 'number_of_frames': 135,
 'frame_offset': 0,
 'MoCap_data': True}

In [12]:
input_frames_data = {f"c{c_idx}": [] for c_idx in range(1, 5)}
output_frames_data = []
img_width = 960
img_height = 540

not_found = 0
seq_keys_list = train_seq_set + test_seq_set + valid_seq_set

for seq_key in seq_keys_list:
    for f_idx in range(sequences[seq_key]['number_of_frames']):
    # for f_idx in range(2):
        curr_output_array = []
        output_frame_dict = raw_output[seq_key][f_idx]
        for point_idx, joint_name in selected_names.items():
            curr_output_array.append(output_frame_dict[joint_name])

        curr_output_array_np = np.array(curr_output_array)
        # print(curr_output_array_np)
        
        curr_input_arrays = {f"c{c_idx}": [] for c_idx in range(1, 5)}

        all_found = True
        
        for c_idx in range(1, 5):
            input_frame_list = raw_input[seq_key][f"c{c_idx}"][str(f_idx)]
            if [None, None] in input_frame_list:
                all_found = False
                break
                
            for point_idx, joint_name in selected_names.items(): 
                pixel_coords = input_frame_list[int(point_idx)]
                curr_input_arrays[f"c{c_idx}"].append([pixel_coords[0]/img_width, pixel_coords[1]/img_height])
                # conversion from pixels to propotions

        # print(curr_input_arrays)

        if all_found:
            for c_idx in range(1, 5):
                input_frames_data[f"c{c_idx}"].append(np.array(curr_input_arrays[f"c{c_idx}"]))
            #     print(np.array(curr_input_arrays[f"c{c_idx}"]).shape)

            # print(curr_output_array_np.shape)    
            output_frames_data.append(curr_output_array_np)
        else:
            not_found += 1

print(f"Frames with all found mocaps: {len(output_frames_data)}")
print(f"Frames with at least one not found mocap: {not_found}")
print(f"Proportion: {100*len(output_frames_data)/(len(output_frames_data) + not_found):.2f}%")
# print(input_frames_data['c4'][0])

Frames with all found mocaps: 5735
Frames with at least one not found mocap: 3345
Proportion: 63.16%


In [13]:
import torch
from torch.utils.data import Dataset

class MoCapInputDataset(Dataset):
    def __init__(self, seq_keys_list, sequences, selected_names, raw_input, raw_output):
        self.img_width = 960
        self.img_height = 540
        self.input_frames_data = {f"c{c_idx}": [] for c_idx in range(1, 5)}
        self.output_frames_data = []
        self.not_found = 0
              
        for seq_key in seq_keys_list:
            for f_idx in range(sequences[seq_key]['number_of_frames']):
                curr_output_array = []
                output_frame_dict = raw_output[seq_key][f_idx]
                for point_idx, joint_name in selected_names.items():
                    curr_output_array.append(output_frame_dict[joint_name])
        
                curr_output_array_np = np.array(curr_output_array)                
                curr_input_arrays = {f"c{c_idx}": [] for c_idx in range(1, 5)}
        
                all_found = True
                
                for c_idx in range(1, 5):
                    input_frame_list = raw_input[seq_key][f"c{c_idx}"][str(f_idx)]
                    if [None, None] in input_frame_list:
                        all_found = False
                        break
                        
                    for point_idx, joint_name in selected_names.items(): 
                        pixel_coords = input_frame_list[int(point_idx)]
                        curr_input_arrays[f"c{c_idx}"].append(
                            [pixel_coords[0]/self.img_width, 
                             pixel_coords[1]/self.img_height])
        
                if all_found:
                    for c_idx in range(1, 5):
                        self.input_frames_data[f"c{c_idx}"].append(np.array(curr_input_arrays[f"c{c_idx}"]))
 
                    self.output_frames_data.append(curr_output_array_np)
                else:
                    self.not_found += 1

        self.length = len(self.output_frames_data)

    def __len__(self):
        return self.length

    def __getitem__(self, idx):
        inputs = [torch.from_numpy(self.input_frames_data[f"c{c_idx}"][idx]).float() for c_idx in range(1, 5)]  # each: (12, 2)
        target = torch.from_numpy(self.output_frames_data[idx]).float()  # (12, 3)
        return inputs, target

In [14]:
from torch.utils.data import DataLoader

train_ds = MoCapInputDataset(train_seq_set, sequences, selected_names, raw_input, raw_output)
valid_ds = MoCapInputDataset(valid_seq_set, sequences, selected_names, raw_input, raw_output)
test_ds = MoCapInputDataset(test_seq_set, sequences, selected_names, raw_input, raw_output)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader = DataLoader(valid_ds, batch_size=32, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=32, shuffle=False)

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomNet(nn.Module):
    def __init__(self):
        super(CustomNet, self).__init__()
        # shape (12, 2) -> reshape to (2, 12) 
        self.conv1d = nn.Conv1d(in_channels=2, out_channels=1, kernel_size=3)  # (2, 12) -> (1, 10)
        # flatten 4 x 10 -> 40 x 1
        self.fc1 = nn.Linear(40, 64)
        self.dropout1 = nn.Dropout(p=0.3)
        self.fc2 = nn.Linear(64, 72)
        self.dropout2 = nn.Dropout(p=0.3)
        self.fc3 = nn.Linear(72, 36)

    def forward(self, x):
        # x: 4 tensors of shape (batch, 12, 2)
        conv_outs = []
        for xi in x:
            xi = xi.permute(0, 2, 1)  # reshape to (batch, 2, 12) 
            conv = self.conv1d(xi)     # (batch, 1, 10)
            conv = conv.squeeze(1)     # (batch, 10)
            conv_outs.append(conv)

        concat = torch.cat(conv_outs, dim=1)  # (batch, 40)

        out = F.relu(self.fc1(concat))
        out = self.dropout1(out)
        out = F.relu(self.fc2(out))
        out = self.dropout2(out)
        out = self.fc3(out)  # (batch, 36)
        out = out.view(-1, 12, 3)  # reshape to (batch, 12, 3)
        return out


In [42]:
model = CustomNet()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for inputs, targets in train_loader:
        inputs = [inp.float() for inp in inputs]
        targets = targets.float()

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * targets.size(0)

    avg_train_loss = train_loss / len(train_loader.dataset)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = [inp.float() for inp in inputs]
            targets = targets.float()

            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item() * targets.size(0)

    avg_val_loss = val_loss / len(val_loader.dataset)

    print(f"Epoch {epoch+1}: Train MSE = {avg_train_loss:.4f}, Val MSE = {avg_val_loss:.4f}")


Epoch 1: Train MSE = 16.0531, Val MSE = 15.0439
Epoch 2: Train MSE = 15.0646, Val MSE = 12.6524
Epoch 3: Train MSE = 12.4248, Val MSE = 10.5646
Epoch 4: Train MSE = 11.9016, Val MSE = 10.5103
Epoch 5: Train MSE = 11.7848, Val MSE = 10.4494
Epoch 6: Train MSE = 11.7179, Val MSE = 10.3682
Epoch 7: Train MSE = 11.5440, Val MSE = 10.1415
Epoch 8: Train MSE = 11.1025, Val MSE = 9.5402
Epoch 9: Train MSE = 9.9309, Val MSE = 7.7670
Epoch 10: Train MSE = 7.2402, Val MSE = 4.2998
Epoch 11: Train MSE = 3.6670, Val MSE = 1.2477
Epoch 12: Train MSE = 1.8192, Val MSE = 0.5175
Epoch 13: Train MSE = 1.5302, Val MSE = 0.4610
Epoch 14: Train MSE = 1.4694, Val MSE = 0.4670
Epoch 15: Train MSE = 1.4520, Val MSE = 0.4686
Epoch 16: Train MSE = 1.4424, Val MSE = 0.4554
Epoch 17: Train MSE = 1.4199, Val MSE = 0.4372
Epoch 18: Train MSE = 1.3933, Val MSE = 0.4511
Epoch 19: Train MSE = 1.3757, Val MSE = 0.4571
Epoch 20: Train MSE = 1.3629, Val MSE = 0.4393
Epoch 21: Train MSE = 1.3396, Val MSE = 0.4431
Epoch 2

In [43]:
model.eval()
test_loss = 0.0
with torch.no_grad():
    for inputs, targets in test_loader:
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        test_loss += loss.item() * targets.size(0)
test_loss /= len(test_loader.dataset)

print(f"Test loss = {test_loss:.4f}")

Test loss = 0.3516


In [44]:
def predict_single(model, inputs_list, device="cpu"):
    model.eval()
    inputs = [torch.from_numpy(inp).float().unsqueeze(0).to(device) for inp in inputs_list]
    
    with torch.no_grad():
        output = model(inputs) 
    
    return output.squeeze(0).cpu().numpy()

In [45]:
list(raw_output[test_seq_set[0]][0].values())

[[9.69188404083252, 4.028162002563477, 0.688791036605835],
 [9.986611366271973, 2.2449145317077637, 0.7812122702598572],
 [10.262113571166992, 0.5781108140945435, 0.867490291595459],
 [9.822742462158203, 4.075028896331787, -0.42783796787261963],
 [10.097484588623047, 2.12271785736084, -0.47968125343322754],
 [10.343533515930176, 0.37429726123809814, -0.5261550545692444],
 [10.102184295654297, 5.991581916809082, 0.7971398830413818],
 [9.823925971984863, 5.420259475708008, 1.9626073837280273],
 [9.6143217086792, 5.653244972229004, 2.547516107559204],
 [10.186217308044434, 5.998184680938721, -0.45327651500701904],
 [10.122572898864746, 5.491803169250488, -1.695825219154358],
 [9.901778221130371, 5.764641761779785, -2.3004868030548096]]

In [33]:
triang_data_file = "./datasets/mediapipe/triangulation.json"

with open(triang_data_file, 'r') as file:
    triangulation_data = json.load(file)


In [46]:
print(test_seq_set[1])
frame = 100

bvh_sample_data = list(raw_output[test_seq_set[0]][frame].values())
triangulation_sample_all_data = triangulation_data[test_seq_set[0]][frame]
triangulation_sample_data = [triangulation_sample_all_data[int(j_idx)] for j_idx in selected_names.keys()]
print(bvh_sample_data)
print()
print(triangulation_sample_data)

p14s3
[[-6.687641143798828, 3.8349709510803223, 1.019135594367981], [-6.973823070526123, 2.048238515853882, 1.0517510175704956], [-5.579444885253906, 1.135089635848999, 0.7628755569458008], [-6.719081401824951, 4.0061445236206055, -0.09257128834724426], [-7.096922397613525, 2.0806357860565186, 0.10577520728111267], [-6.847524642944336, 0.34556829929351807, 0.32260560989379883], [-6.532391548156738, 5.791557312011719, 1.118679165840149], [-6.347134113311768, 4.492308139801025, 1.3181345462799072], [-6.641845703125, 3.9046268463134766, 1.4081884622573853], [-6.535183429718018, 5.779489040374756, -0.26494431495666504], [-6.38330602645874, 4.473793983459473, -0.5415172576904297], [-6.676773548126221, 3.845623016357422, -0.6314293742179871]]

[[215.5354303187731, -1418.4355213577085, 279.85629064136907], [85.44735387564796, -1737.1196495425609, 120.47440807360421], [244.96879803264895, -1729.0923372547386, 523.0504111254594], [73.85670306535584, -1749.2132575215505, 508.9235034010974], [223

In [47]:
img_width = 960
img_height = 540

mp_input_sample = []

for c_idx in range(1, 5):
    all_frames_for_camera = raw_input[test_seq_set[0]][f"c{c_idx}"][str(frame)]
    camera_mp_input_sample = []
    
    for point_idx, joint_name in selected_names.items(): 
        pixel_coords = all_frames_for_camera[int(point_idx)]
        camera_mp_input_sample.append([pixel_coords[0]/img_width, pixel_coords[1]/img_height])

    mp_input_sample.append(np.array(camera_mp_input_sample))
    
mp_input_sample

[array([[0.26770833, 0.67407407],
        [0.22604167, 0.70740741],
        [0.21041667, 0.61111111],
        [0.22083333, 0.60185185],
        [0.21458333, 0.50185185],
        [0.23541667, 0.5       ],
        [0.2125    , 0.33703704],
        [0.24166667, 0.33518519],
        [0.21041667, 0.42777778],
        [0.24270833, 0.41851852],
        [0.196875  , 0.5       ],
        [0.22083333, 0.47592593]]),
 array([[0.490625  , 0.51481481],
        [0.503125  , 0.52962963],
        [0.490625  , 0.46666667],
        [0.50416667, 0.47037037],
        [0.49270833, 0.41296296],
        [0.50833333, 0.41296296],
        [0.4875    , 0.33148148],
        [0.51458333, 0.33518519],
        [0.48229167, 0.37407407],
        [0.52083333, 0.37962963],
        [0.478125  , 0.41111111],
        [0.52395833, 0.41481481]]),
 array([[0.709375  , 0.61481481],
        [0.76354167, 0.67037037],
        [0.75833333, 0.55185185],
        [0.77291667, 0.56296296],
        [0.75625   , 0.45555556],
        [0

In [48]:
predicted = predict_single(model, mp_input_sample, 'cpu')
predicted

array([[-6.469159  ,  0.48426074,  0.07704934],
       [-6.417074  ,  0.46577597, -0.24570024],
       [-6.6621814 ,  1.839711  ,  0.16127095],
       [-6.617307  ,  1.8023454 , -0.33486965],
       [-6.5229716 ,  3.334403  ,  0.24297516],
       [-6.4780607 ,  3.3174381 , -0.4377083 ],
       [-6.4456573 ,  5.004775  ,  0.3567548 ],
       [-6.4291835 ,  4.9952536 , -0.48991212],
       [-6.385685  ,  3.9259408 ,  0.4254493 ],
       [-6.3185225 ,  3.928538  , -0.5840397 ],
       [-6.591518  ,  3.4022703 ,  0.45031327],
       [-6.483366  ,  3.3883078 , -0.6298578 ]], dtype=float32)

In [49]:
import plotly.graph_objects as go
import plotly.io as pio
pio.renderers.default = 'iframe'

x = [vec[2] for vec in bvh_sample_data]
y = [vec[0] for vec in bvh_sample_data]
z = [vec[1] for vec in bvh_sample_data]

SCALE_FACTOR = 254

x_t = [vec[0]/SCALE_FACTOR for vec in triangulation_sample_data]
y_t = [vec[1]/SCALE_FACTOR for vec in triangulation_sample_data]
z_t = [vec[2]/SCALE_FACTOR for vec in triangulation_sample_data]

x_p = [vec[2] for vec in predicted]
y_p = [vec[0] for vec in predicted]
z_p = [vec[1] for vec in predicted]
    
fig = go.Figure(
    data=[
        go.Scatter3d(
            x=x, y=y, z=z,
            mode='markers',
            marker=dict(size=5, color='blue'),
            hoverinfo='text',
            name='Joints BVH'),
        go.Scatter3d(
            x=x_t, y=y_t, z=z_t,
            mode='markers',
            marker=dict(size=5, color='red'),
            hoverinfo='text',
            name='Joints triangulation mediapipe'),
        go.Scatter3d(
            x=x_p, y=y_p, z=z_p,
            mode='markers',
            marker=dict(size=5, color='green'),
            hoverinfo='text',
            name='Predicted by NN'),
        ]
)

fig.update_layout(scene=dict(
    xaxis_title='X',
    yaxis_title='Y',
    zaxis_title='Z',
    xaxis=dict(range=[-15, 15]),
    yaxis=dict(range=[-15, 15]),
    zaxis=dict(range=[-15, 15]),
    aspectmode='cube', 
),
title='3D joints plot from bvh file',
width=800,
height=800
)

fig.show()