In [1]:
import torch 
import itertools
import numpy as np
import random
import scipy as sp
from scipy import signal
from torch import nn 
from torch.autograd import Variable 
from tqdm import trange
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
def filteremg1(time, emg, low_pass=5, sfreq=400, high_band=10, low_band=180):
    """
    time: Time data
    emg: EMG data
    high: high-pass cut off frequency
    low: low-pass cut off frequency
    sfreq: sampling frequency
    """
    
    # normalise cut-off frequencies to sampling frequency
    high_band = high_band/(sfreq/2)
    low_band = low_band/(sfreq/2)
    
    # create bandpass filter for EMG
    b1, a1 = sp.signal.butter(4, [high_band,low_band], btype='bandpass')
    
    # process EMG signal: filter EMG
    emg_filtered = sp.signal.filtfilt(b1, a1, emg)    
    
    # process EMG signal: rectify
    emg_rectified = abs(emg_filtered)
    
    # create lowpass filter and apply to rectified signal to get EMG envelope
    low_pass = low_pass/(sfreq/2)
    b2, a2 = sp.signal.butter(4, low_pass, btype='lowpass')
    emg_envelope = sp.signal.filtfilt(b2, a2, emg_rectified)
    return emg_envelope

In [3]:
full_array_data_myo = []
full_array_data_vr = []
full_array_data_ts = []
for num_file in trange(0, 58):
    if num_file < 10:
        num_file = '0' + str(num_file)
    else:
        num_file = str(num_file)
    b = np.load(f'/kaggle/input/alvi-hack-2023/data_train/{num_file}.npz')
    for num in b['data_myo']:
        full_array_data_myo.append(num)
    for num in b['data_vr']:
        num = list(itertools.chain.from_iterable(num))
        full_array_data_vr.append(num)
    for num in b['myo_ts'][600:]:
        full_array_data_ts.append(num)

100%|██████████| 58/58 [00:20<00:00,  2.85it/s]


In [4]:
time = full_array_data_ts
filtred_myo1 = []
for i in range(8):
    emg = [m[i] for m in full_array_data_myo]
    emg_correctmean = emg - np.mean(emg)
    filtred_myo1.append(filteremg1(time, emg_correctmean, low_pass=2))
filtred_myo1 = np.array(filtred_myo1).T

filtred_myo2 = []
for i in range(8):
    emg = [m[i] for m in full_array_data_myo]
    emg_correctmean = emg - np.mean(emg)
    filtred_myo2.append(filteremg1(time, emg_correctmean, low_pass=3))
filtred_myo2 = np.array(filtred_myo2).T

filtred_myo3 = []
for i in range(8):
    emg = [m[i] for m in full_array_data_myo]
    emg_correctmean = emg - np.mean(emg)
    filtred_myo3.append(filteremg1(time, emg_correctmean, low_pass=5))
filtred_myo3 = np.array(filtred_myo3).T

filtred_myo4 = []
for i in range(8):
    emg = [m[i] for m in full_array_data_myo]
    emg_correctmean = emg - np.mean(emg)
    filtred_myo4.append(filteremg1(time, emg_correctmean, low_pass=10))
filtred_myo4 = np.array(filtred_myo4).T


In [5]:
myo_full = np.concatenate((filtred_myo1[:], filtred_myo2[:], filtred_myo3[:], filtred_myo4[:]))

In [6]:
vr_full = np.concatenate((finger_first_vr[:], finger_first_vr[:], finger_first_vr[:], finger_first_vr[:]))

NameError: name 'finger_first_vr' is not defined

In [None]:
len(a)

In [None]:
len(vr_full)

In [None]:
finger_first_vr = []
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][4:64]
    finger_first_vr.append(curr)

In [None]:
finger_first_vr = []
finger_second_vr = []
finger_third_vr = []
finger_fourth_vr = []
finger_fifth_vr = []
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][4:16]
    finger_first_vr.append(curr)
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][16:28]
    finger_second_vr.append(curr)
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][28:40]
    finger_third_vr.append(curr)
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][40:52]
    finger_fourth_vr.append(curr)
for i in trange(len(full_array_data_vr)):
    curr = full_array_data_vr[i][52:64]
    finger_fifth_vr.append(curr)

In [None]:
# class TorchDataset(torch.utils.data.Dataset):
    
#     def __init__(self, x_data, y_data):
#         self.x_data = x_data
#         self.y_data = y_data
        
#     def __len__(self):
#         return len(self.x_data)
    
#     def __getitem__(self, idx):
#         return [torch.tensor(self.x_data[idx], dtype=torch.float), 
#             torch.tensor(self.y_data[idx], dtype=torch.float)]

In [None]:
train_size = int(0.8 * len(full_array_data_myo))
# train_set = TorchDataset(full_array_data_myo[:train_size], finger_first_vr[:train_size])
# train_loader = torch.utils.data.DataLoader(train_set, batch_size = 1024, shuffle = True)

# test_set = TorchDataset(full_array_data_myo[train_size:], finger_first_vr[train_size:])
# test_loader = torch.utils.data.DataLoader(test_set, batch_size = 1024, shuffle = True)

# X_train_tensors = Variable(torch.Tensor(full_array_data_myo[:train_size]))
# y_train_tensors = Variable(torch.Tensor(finger_first_vr[:train_size]))
# y_train_tensors = y_train_tensors.to(device)

# X_test_tensors = Variable(torch.Tensor(full_array_data_myo[train_size:]))
# y_test_tensors = Variable(torch.Tensor(finger_first_vr[train_size:]))
# y_test_tensors = y_test_tensors.to(device)

In [None]:
# X_train_tensors_final = torch.reshape(X_train_tensors,   (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))
# X_train_tensors_final = X_train_tensors_final.to(device)

# X_test_tensors_final = torch.reshape(X_test_tensors,  (X_test_tensors.shape[0], 1, X_test_tensors.shape[1])) 
# X_test_tensors_final = X_test_tensors_final.to(device)

In [None]:
def set_random_seed(seed):
    torch.backends.cudnn.deterministic = True
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
set_random_seed(123456)

In [None]:
class LSTM1(nn.Module):
    def __init__(self, num_classes, input_size, hidden_size, num_layers, seq_length):
        super(LSTM1, self).__init__()
        self.num_classes = num_classes #number of classes
        self.num_layers = num_layers #number of layers
        self.input_size = input_size #input size
        self.hidden_size = hidden_size #hidden state
        self.seq_length = seq_length #sequence length

        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                          num_layers=num_layers, batch_first=True) #lstm
        self.fc_1 =  nn.Linear(hidden_size, 128) #fully connected 1
        self.fc = nn.Linear(128, num_classes) #fully connected last layer

        self.relu = nn.ReLU()
    
    def forward(self,x):
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)) #hidden state
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)) #internal state
        
        h_0 = h_0.to(device)
        c_0 = c_0.to(device)
        
        # Propagate input through LSTM
        output, (hn, cn) = self.lstm(x, (h_0, c_0)) #lstm with input, hidden, and internal state
        hn = hn.view(-1, self.hidden_size) #reshaping the data for Dense layer next
        out = self.relu(hn)
        out = self.fc_1(out) #first Dense
        out = self.relu(out) #relu
        out = self.fc(out) #Final Output
        return out

In [None]:
num_epochs = 10000 #1000 epochs
learning_rate = 0.01 #0.001 lr

input_size = 8 #number of features
hidden_size = 30 #number of features in hidden state
num_layers = 1 #number of stacked lstm layers

num_classes = 60 #number of output classes

criterion = nn.MSELoss().to(device)

In [None]:
def train_finger(model, optimizer, data_in, data_out):
    
    X_train_tensors = Variable(torch.Tensor(data_in))
    X_train_tensors_final = torch.reshape(X_train_tensors,   (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))
    X_train_tensors_final = X_train_tensors_final.to(device)
    
    y_train_tensors = Variable(torch.Tensor(data_out))
    y_train_tensors = y_train_tensors.to(device)
    
    for epoch in range(num_epochs):
        outputs = model.forward(X_train_tensors_final) #forward pass
        optimizer.zero_grad() #caluclate the gradient, manually setting to 0
        
        # obtain the loss function
        loss = criterion(outputs, y_train_tensors)
        
        loss.backward() #calculates the loss of the loss function
        
        optimizer.step() #improve from loss, i.e backprop
        if epoch % 100 == 0:
            print("Epoch: %d, loss: %1.5f" % (epoch, loss.item()))


In [None]:
finger1 = LSTM1(num_classes, input_size, hidden_size, num_layers, 1) #our lstm class 
finger1 = finger1.to(device)
optimizer = torch.optim.Adam(finger1.parameters(), lr = learning_rate)

train_finger(finger1, optimizer, myo_full, vr_full)

In [None]:
finger2 = LSTM1(num_classes, input_size, hidden_size, num_layers, 1) #our lstm class 
finger2 = finger2.to(device)
optimizer = torch.optim.Adam(finger2.parameters(), lr = learning_rate)

train_finger(finger2, optimizer, filtred_myo1, finger_second_vr)

In [None]:
finger3 = LSTM1(num_classes, input_size, hidden_size, num_layers, 1) #our lstm class 
finger3 = finger2.to(device)
optimizer = torch.optim.Adam(finger3.parameters(), lr = learning_rate)

train_finger(finger3, optimizer, filtred_myo1, finger_third_vr)

In [None]:
finger4 = LSTM1(num_classes, input_size, hidden_size, num_layers, 1) #our lstm class 
finger4 = finger4.to(device)
optimizer = torch.optim.Adam(finger4.parameters(), lr = learning_rate)

train_finger(finger4, optimizer, filtred_myo1, finger_fourth_vr)

In [None]:
finger5 = LSTM1(num_classes, input_size, hidden_size, num_layers, 1) #our lstm class 
finger5 = finger5.to(device)
optimizer = torch.optim.Adam(finger5.parameters(), lr = learning_rate)

train_finger(finger5, optimizer, filtred_myo1, finger_fifth_vr)

In [None]:
def predict_Hand(model1, data):
    X_train_tensors = Variable(torch.Tensor(data))
    X_train_tensors_final = torch.reshape(X_train_tensors,   (1, 1, X_train_tensors.shape[0]))
    X_train_tensors_final = X_train_tensors_final.to(device)
    with torch.no_grad():
        output1 = model1.forward(X_train_tensors_final)
        output1[output1 < -1] = -1
        output1[output1 > 1] = 1
    curr_hand = np.array([0, 0, 0, -1])
    pred = output1.cpu().numpy()
    # pred = pred.reshape(16, 4)
    
    return np.append(curr_hand, [pred.reshape(-1)])

In [None]:
max(predict_Hand(finger1, full_array_data_myo[0]))

In [None]:
from pathlib import Path 
import numpy as np
import pandas as pd


def prepare_predictions_for_csv(list_predictions):
    """
    [ [Time, 16, 4 ], ... ]
    return np array with N values.  
    """
    result = []
    for pred in list_predictions: 
        pred = np.reshape(pred[::10], [-1])
        result.extend(pred)
    result = np.array(result)
    return result



# There are 00.npz, 01.npz, ...
DATA_VAL_FOLDER = Path('/kaggle/input/alvi-hack-2023/data_submission/')

# For each file with specific movement predict quaternions based on myo_data then stack it to list
list_preds = []
for p in sorted(DATA_VAL_FOLDER.glob('*.npz')):
    file_data = np.load(p)
    myo_data = file_data['data_myo']
    
    filtred_myo_test =[]

    for i in range(8):
        emg = [m[i] for m in myo_data]
        emg_correctmean = emg - np.mean(emg)
        filtred_myo_test.append(filteremg1(time, emg_correctmean, low_pass=3))
    filtred_myo_test = np.array(filtred_myo_test).T
    
    preds = []
    for curr in filtred_myo_test:
        predict = predict_Hand(finger1, curr)
        preds.append(predict)
    
    # Put it to common list
    list_preds.append(preds)
    
np.savez('predict_lstm_f3_minus.npz', list_preds)
dummy_pred = prepare_predictions_for_csv(list_preds)

# There are must be two columns with header Id,Expected
df = pd.DataFrame({'Predicted': dummy_pred})


df.to_csv('submission_lstm_f3_minus.csv', index_label = 'Id')

In [None]:
import numpy as np 
import pandas as pd 
from pathlib import Path
import json
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.spatial.transform import Rotation as R
from matplotlib.animation import PillowWriter
from matplotlib import animation
from IPython import display
import wandb
from pathlib import Path

In [None]:
json_path = 'big_global_data.json'
name_to_color = {'thumb': 'red',
                 'index': 'blue',
                 'middle': 'green',
                 'ring': 'orange',
                 'pinky': 'pink'}

# with open(json_path, 'r') as f:
#     json_dict = json.load(f)

def multiply_quant(q1, q2):
    r1 = R.from_quat(q1)
    r2 = R.from_quat(q2)
    return (r1 * r2).as_quat()
def apply_quat(q, v):
    return R.from_quat(q).apply(v)

def get_finger_vecs_custom(hand_data, finger_start_pose, name):
    e = np.array([1, 0, 0])
    
    
    quat1 = hand_dict['b_r_wrist']
    
    quat2 = hand_dict[f'b_r_{name}1']
    quat3 = hand_dict[f'b_r_{name}2']
    quat4  = hand_dict[f'b_r_{name}3']
    
    quat1_inv = R.from_quat(quat1).inv().as_quat()
    
    
    quats = [quat1, quat2, quat3, quat4]
    quats = [multiply_quant(quat1_inv, q) for q in quats]
    
    t_vecs = [apply_quat(q, e) for q in quats] 

    prev = np.array(finger_start_pose[name])
    
    prev = apply_quat(quat1_inv, prev)

    
    t_vecs = [v * np.linalg.norm(prev)/2 for v in t_vecs]

    
    # get sequential summation of the vectors.
    t_vecs_new =[]

    t_vecs_new.append(prev)
    for v in t_vecs[1:]:
        prev = prev + v
        t_vecs_new.append(prev)
    
    t_vecs_new = np.stack(t_vecs_new)
    return t_vecs_new





    
def inverse_rotations(sample):


    ### normalisation if needed.
    quat_base = sample[0]
    quat_base_inv = R.from_quat(quat_base).inv().as_quat()

    quats_new = [multiply_quant(quat_base_inv, q) for q in sample]
    quats_new = np.stack(quats_new)

    return quats_new




class Hand:
    """
    Allows to get slice of Hand parameters.
    
    """
    name_to_idx = {"b_r_wrist": 0, "b_r_index1": 1,
                   "b_r_index2": 2, "b_r_index3": 3,
                   "b_r_middle1": 4, "b_r_middle2": 5,
                   "b_r_middle3": 6, "b_r_pinky1": 7,
                   "b_r_pinky2" : 8, "b_r_pinky3": 9,
                   "b_r_ring1": 10, "b_r_ring2": 11,
                   "b_r_ring3": 12, "b_r_thumb1": 13,
                   "b_r_thumb2": 14, "b_r_thumb3": 15}

    idx_to_name = {0: "b_r_wrist", 1: "b_r_index1",
               2: "b_r_index2", 3: "b_r_index3",
               4: "b_r_middle1", 5: "b_r_middle2",
               6: "b_r_middle3", 7: "b_r_pinky1",
               8: "b_r_pinky2", 9: "b_r_pinky3",
               10: "b_r_ring1", 11: "b_r_ring2",
               12: "b_r_ring3", 13: "b_r_thumb1",
               14: "b_r_thumb2", 15: "b_r_thumb3"}
    
    
    # normalize start point of hand
    
    finger_start_pose = {'thumb': np.array([ 1.79773151e-04, -9.56848596e-05,  1.40103373e-04]),
                         'index': np.array([ 4.79965970e-04, -3.64983564e-05,  1.17722357e-04]),
                         'middle': np.array([ 4.78224132e-04, -1.26683160e-05,  8.61489718e-06]),
                         'ring': np.array([ 4.43463102e-04, -3.26179546e-05, -8.73337173e-05]),
                         'pinky': np.array([ 3.89465811e-04, -6.83795585e-05, -1.75300101e-04])}
    
    
    finger_names = ['thumb', 'index', 'middle', 'ring', 'pinky']
    name_to_color = {'thumb': 'red',
                 'index': 'blue',
                 'middle': 'green',
                 'ring': 'orange',
                 'pinky': 'pink'}
    # init method or constructor 
    def __init__(self, data, hand_type='right'):
        """
        
        data - [frames, n_bones, 4] rotations
        """
        self.hand_type = hand_type
        
        self.azim, self.elev = -270, -220
        self.data = data
        self.all_points  = [] # N, 5, 4, 3 
#         self.points = for
    
    def get_rotations(self, i):
        
        sample = self.data[i]
        
        
        ### normalisation if needed.
        quat_base = sample[self.name_to_idx['b_r_wrist']]
        quat_base_inv = R.from_quat(quat_base).inv().as_quat()
        
        quats_new = [multiply_quant(quat_base_inv, q) for q in sample]
        quats_new = np.stack(quats_new)
        
        return quats_new
        
    def get_frame_points(self, idx):
        points = []
        for name in self.finger_names:
            tmp = self.get_one_finger_points(idx, name)
            points.append(tmp)
        return points
        
    
    def get_one_finger_points(self,idx, name):
    
        sample = self.get_rotations(idx)
        
        quat1 = sample[self.name_to_idx['b_r_wrist']]
        quat2 = sample[self.name_to_idx[f'b_r_{name}1']]
        quat3 = sample[self.name_to_idx[f'b_r_{name}2']]
        quat4  = sample[self.name_to_idx[f'b_r_{name}3']]

        quats = [quat1, quat2, quat3, quat4]
        
        e = np.array([1, 0, 0])
        
        t_vecs = [apply_quat(q, e) for q in quats]
        prev = np.array(self.finger_start_pose[name])
        t_vecs = [v * np.linalg.norm(prev)/2 for v in t_vecs]

        # get sequential summation of the vectors.
        t_vecs_new =[]

        t_vecs_new.append(prev)
        for v in t_vecs[1:]:
            prev = prev + v
            t_vecs_new.append(prev)

        t_vecs_new = np.stack(t_vecs_new)
        return t_vecs_new
        
    def convert_all_frames_to_points(self):
        self.all_points = [self.get_frame_points(n) for n in range(len(self))]
    
    
    
#     def fast_animation(self):
        
        
        
    def __len__(self):
        return len(self.data)
    
    def init_3dplot(self):

        fig = plt.figure(figsize = (5, 5))
        ax = Axes3D(fig)
        ax.view_init(azim=self.azim, elev=self.elev)
        
        return fig, ax

    
    def visualize_one_frame(self, idx, fig=None):
        """
        Fig is not None for animation only.
        """

        animate = False if fig is None else True
        if animate:
            ax = fig.axes[0]
        else:
            fig, ax = self.init_3dplot()
        
        plt.axis('off')
        lim = 0.0008
        ax.set_xlim(-lim/4, 2*lim)
        ax.set_ylim(-lim/2,lim/2)
        ax.set_zlim(-lim/2,lim/2)
        
        
        points = self.get_frame_points(idx)
        
        ## wrist position
        ax.scatter(0, 0, 0, s = 100, c = 'black', marker = '*')
        
        for i, finger_points in enumerate(points):
            x, y, z = finger_points[:, 0], finger_points[:, 1], -finger_points[:, 2]
            
            ax.scatter(x, y, z, c = name_to_color[self.finger_names[i]], s = 150)
            ax.plot([0, *x], [0, *y], [0, *z], 
                    color =name_to_color[self.finger_names[i]], 
                    linewidth=4)

        
        if not animate:
            return fig
        
    
    
    def visualize_all_frames(self):
        fig, ax = self.init_3dplot()
        
        def animate(i):
            plt.cla()
            value_points = self.visualize_one_frame(i, fig) 
        
        anim = animation.FuncAnimation(fig, animate, interval=1, frames =len(self), repeat=True, )
        
        return anim
    
    

    
def save_animation(anim, path, fps =  30):    
    writergif = animation.PillowWriter(fps=fps) 
    anim.save(path, writer=writergif)
    plt.close()        

    
def save_animation_mp4(anim, path, fps):
    FFwriter = animation.FFMpegWriter(fps=fps)
    anim.save(path, writer = FFwriter)
        
def visualize_val_moves(model, val_exps_data, epoch, device, window_size = 512):
    old_fps = 200
    new_fps = 20
    step = old_fps//new_fps
    
    for n, raw_data in enumerate(val_exps_data):
        
        x, y = raw_data['data_myo'][:window_size], raw_data['data_vr'][:window_size]
        
        y_pred = model.inference(x, device)

        hand_gt = Hand(y[-256::step])
        hand_pred = Hand(y_pred[-256::step])
        
        gt_path = f'{wandb.run.dir}/videos/{n}_move/true_sample_{epoch}.gif'
        pred_path = f'{wandb.run.dir}/videos/{n}_move/pred_sample_{epoch}.gif'
        Path(gt_path).parent.mkdir(parents=True, exist_ok=True)

        plt.close()
        ani_gt = hand_gt.visualize_all_frames()
        save_animation(ani_gt, gt_path, fps = new_fps)
        
        plt.close()
        ani_pred = hand_pred.visualize_all_frames()
        save_animation(ani_pred, pred_path, fps = new_fps)

        
        wandb.log({f"visualization/{n}_move": [wandb.Video(gt_path, fps=new_fps), 
                                               wandb.Video(pred_path, fps=new_fps)]})
        del hand_gt
        del hand_pred

In [None]:
with np.load('/kaggle/working/predict_lstm_f3_minus.npz', allow_pickle=True) as data:
    data_vr = np.array(data['arr_0'][0])
    data_vr_viz = []
    for i in range(len(data_vr)):
        data_vr_viz.append(np.array(data_vr[i]).reshape(16, 4))
    target_hand = Hand(data_vr_viz[::1])

In [None]:
output_folder = Path(f"/kaggle/working/figures/")
output_folder.mkdir(parents=True, exist_ok=True)

prefix_name = f"one_moves_fps_{30}"

target_hand = Hand(data_vr_viz[::30])
target_hand_anim = target_hand.visualize_all_frames()
targets_path = output_folder / f"{prefix_name}.mp4"

save_animation_mp4(target_hand_anim, targets_path, fps=30)

In [None]:
target_hand.visualize_one_frame(150)