In [11]:
import nimblephysics as nimble
import numpy as np
import torch
from cvxpylayers.torch import CvxpyLayer
import cvxpy as cp

import time

from IPython.core.display_functions import clear_output
from open3d.examples.visualization.draw import get_icp_transform
from quaternion import angular_velocity
from einops import einsum, repeat
from scipy.ndimage import median_filter
from tqdm import tqdm
import os
import pickle

In [12]:
trial_path = "/home/meribejayson/Desktop/Projects/realistic-imu/data/final_dataset/DIP/S1.b3d"
GEOMETRY_PATH = "/home/meribejayson/Desktop/Projects/realistic-imu/data/final_dataset/Geometry/"
GRAVITY = 9.80665

In [13]:
from typing import List, Tuple

subject = nimble.biomechanics.SubjectOnDisk(trial_path)
skeleton = nimble.dynamics.Skeleton = subject.readSkel(processingPass=1,
                                                       geometryFolder=GEOMETRY_PATH)

skel_joints = [skeleton.getJoint(i).getName() for i in range(skeleton.getNumJoints())]
skel_body_nodes = [skeleton.getBodyNode(i).getName() for i in range(skeleton.getNumBodyNodes())]

print(len(skel_body_nodes))
skel_body_nodes

Setting len to 0.106608
Setting neutral pos to   0
0.7
  0
Setting len to 0.342794
Setting neutral pos to    0
-0.6
   0
Setting len to 0.119716
Setting neutral pos to   0
0.2
  0
24


['pelvis',
 'femur_r',
 'tibia_r',
 'talus_r',
 'calcn_r',
 'toes_r',
 'femur_l',
 'tibia_l',
 'talus_l',
 'calcn_l',
 'toes_l',
 'lumbar_body',
 'thorax',
 'head',
 'scapula_r',
 'humerus_r',
 'ulna_r',
 'radius_r',
 'hand_r',
 'scapula_l',
 'humerus_l',
 'ulna_l',
 'radius_l',
 'hand_l']

In [14]:
print(len(skel_joints))
skel_joints

24


['ground_pelvis',
 'hip_r',
 'walker_knee_r',
 'ankle_r',
 'subtalar_r',
 'mtp_r',
 'hip_l',
 'walker_knee_l',
 'ankle_l',
 'subtalar_l',
 'mtp_l',
 'lumbar',
 'thoracic_spine',
 'neck',
 'scapulothoracic_r',
 'GlenoHumeral_r',
 'elbow_r',
 'radioulnar_r',
 'wrist_r',
 'scapulothoracic_l',
 'GlenoHumeral_l',
 'elbow_l',
 'radioulnar_l',
 'wrist_l']

In [15]:
skeleton.getNumDofs()

49

In [16]:
right_wrist: nimble.dynamics.Joint = skeleton.getBodyNode("hand_r")
translation: np.ndarray = np.array([0.0, 0.05, 0.0])
rotation: np.ndarray = np.eye(3)
watch_offset: nimble.math.Isometry3 = nimble.math.Isometry3(rotation, translation)

skeleton.setGravity(np.array([0.0, -GRAVITY, 0.0]))

print(skeleton.getBodyNode("hand_r").getWorldTransform())

[[ 0.8503932  -0.28173527  0.44436094  0.07124618]
 [ 0.26341593  0.95906446  0.10395866  0.88399438]
 [-0.4554596   0.02864602  0.88979546  0.18382312]
 [ 0.          0.          0.          1.        ]]


In [17]:
import matplotlib.pyplot as plt

def minimize(sequence):
    smooth_weight = 0.36
    regularization_weight = 0.1
    lr = 2
    iterations = 10000
    device = "cuda" if torch.cuda.is_available() else "cpu"

    orig_sequence = torch.from_numpy(sequence).to(device=device)
    output_sequence = orig_sequence.detach().clone().requires_grad_(True)

    optimizer = torch.optim.SGD([output_sequence], lr=lr)

    def loss():
        return smooth_weight * torch.square(output_sequence - orig_sequence).sum() + regularization_weight * torch.square(output_sequence).sum()

    for i in range(iterations):
        optimizer.zero_grad()
        loss_val = loss()
        loss_val.backward()
        optimizer.step()

    return output_sequence.detach().cpu().numpy()

In [18]:
def fill_nans(acc: torch.Tensor) -> torch.Tensor:
    # Reshape (T, N, C) -> (T, N*C) for simpler 1D column-wise interpolation
    T, N, C = acc.shape
    acc_2d = acc.reshape(T, N * C)

    # Move to CPU (if on GPU) for NumPy ops
    acc_np = acc_2d.cpu().numpy()

    for col_idx in range(acc_np.shape[1]):
        col_data = acc_np[:, col_idx]
        # Find indices of valid and invalid entries
        valid_mask = ~np.isnan(col_data)
        if not np.any(valid_mask):
            # All NaNs; decide how to fill them (zero or remain NaN)
            # Here we choose to fill with zero for demonstration:
            col_data[:] = 0.0
            continue
        if np.all(valid_mask):
            # Already has no NaNs in this column
            continue

        valid_x = np.where(valid_mask)[0]
        valid_y = col_data[valid_mask]
        invalid_x = np.where(~valid_mask)[0]

        # Interpolate
        col_data[invalid_x] = np.interp(invalid_x, valid_x, valid_y)

    # Move back to original device
    acc_filled = torch.from_numpy(acc_np).reshape(T, N, C).to(acc.device)
    return acc_filled

In [19]:
with open("/home/meribejayson/Desktop/Projects/realistic-imu/data/final_dataset/DIP/DIP_orig/S1/01.pkl", "rb") as file:
    vals = pickle.load(file, encoding="latin1")["imu"]

vals[:, :, :3]

array([[[ 0.89995843, -0.15217061, -0.40855714],
        [ 0.98648638, -0.01189452,  0.16341096],
        [        nan,         nan,         nan],
        ...,
        [-0.14920152, -0.95913027,  0.240433  ],
        [ 0.97157289,  0.08987311,  0.21901815],
        [        nan,         nan,         nan]],

       [[ 0.90291665, -0.15042667, -0.402633  ],
        [ 0.98649444, -0.01166408,  0.16337893],
        [ 0.98324461,  0.00484152,  0.18222677],
        ...,
        [-0.14931078, -0.95916191,  0.2402389 ],
        [ 0.97160456,  0.08989438,  0.21886885],
        [ 0.94187588,  0.02695563,  0.33487791]],

       [[ 0.90710633, -0.14777461, -0.39410756],
        [ 0.98643782, -0.01152577,  0.1637302 ],
        [ 0.98329519,  0.00478607,  0.18195509],
        ...,
        [-0.14962804, -0.95967677,  0.23797469],
        [ 0.97161546,  0.09008362,  0.21874263],
        [ 0.94187571,  0.02695153,  0.33487874]],

       ...,

       [[ 0.91603171,  0.03686403,  0.39940824],
        [ 0

In [20]:
from scipy import signal
from scipy.ndimage import uniform_filter1d

import torch

class DIP_Subject:
    def __init__(self, b3d_path):
        self.subject = nimble.biomechanics.SubjectOnDisk(b3d_path)
        self.skeleton = nimble.dynamics.Skeleton = self.subject.readSkel(processingPass=1,
                                                                         geometryFolder=GEOMETRY_PATH)
        self.subject_num = int(b3d_path.split("/")[-1].split(".")[0][1:])
        self.synthetic_component_names = [
            "head",
            "sternum",
            "pelvis",
            "lshoulder",
            "rshoulder",
            "lupperarm",
            "rupperarm",
            "llowerarm",
            "rlowerarm",
            "lupperleg",
            "rupperleg",
            "llowerleg",
            "rlowerleg",
            "lhand",
            "rhand",
            "lfoot",
            "rfoot",
        ]

        # Corresponding bones to the sensors
        self.skel_names = [
            "head",
            "thorax",
            "pelvis",
            "scapula_l",
            "scapula_r",
            "humerus_l",
            "humerus_r",
            "ulna_l",
            "ulna_r",
            "femur_l",
            "femur_r",
            "tibia_l",
            "tibia_r",
            "hand_l",
            "hand_r",
            "calcn_l",
            "calcn_r"
        ]

        self.N_MEAS = len(self.skel_names) * 3

        self.imus = [(self.skeleton.getBodyNode(name), nimble.math.Isometry3.Identity()) for name in self.skel_names]

        self.corresponding_imu_path = os.path.join(os.path.dirname(b3d_path), f"DIP_orig/S{self.subject_num}")
        self.trial_map = {}
        self.joint_data_map = {}
        self.trial_imu_map = {}
        self.skeleton.setGravity([0, -GRAVITY, 0])
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        print(f"Processing DIP Subject {self.subject_num}")


        # Creating a map between a trial for some subject and the corresponding frame list
        for i in range(self.subject.getNumTrials()):
            key = self.subject.getTrialOriginalName(i).split("_")[0]
            curr_trial = self.subject.readFrames(trial=i,
                                                 startFrame=0,
                                                 numFramesToRead=self.subject.getTrialLength(i),
                                                 includeProcessingPasses=True)

            if key in self.trial_map:
                self.trial_map[key].extend(curr_trial)
            else:
                self.trial_map[key] = curr_trial

        # Define Butterworth filter parameters
        self.fs = 60  # Sampling frequency
        self.cutoff =  8  # Cutoff frequency
        self.b, self.a = signal.butter(10, self.cutoff / (self.fs / 2), btype='low')

        # Creating a map for joint state
        for key, frames in self.trial_map.items():
            joint_angles_raw = np.vstack([frame.processingPasses[0].pos for frame in frames]).T
            joint_vel_raw = np.vstack([frame.processingPasses[0].vel for frame in frames]).T
            joint_acc_raw = np.vstack([frame.processingPasses[0].acc for frame in frames]).T

            """
            self.joint_data_map[key] = {
                "joint_angles": signal.filtfilt(self.b, self.a, joint_angles_raw, axis=1),
                "joint_vel": signal.filtfilt(self.b, self.a, joint_vel_raw, axis=1),
                "joint_acc": signal.filtfilt(self.b, self.a, joint_acc_raw, axis=1),
            }
            """


            print(joint_acc_raw.shape)
            self.filter = (1, 8)
            self.joint_data_map[key] = {
                "joint_angles": joint_angles_raw,
                "joint_vel": uniform_filter1d(median_filter(joint_vel_raw, size=self.filter), size=20, axis=1),
                "joint_acc": uniform_filter1d(median_filter(joint_acc_raw, size=self.filter), size=20, axis=1),
            }


            with open(os.path.join(self.corresponding_imu_path, f"{key}.pkl"), "rb") as file:
                real_data = pickle.load(file, encoding="latin1")["imu"]
                acc = real_data[:, :, 9:12]
                acc = fill_nans(torch.from_numpy(acc))

            self.num_trials = len(self.joint_data_map.keys())

            self.synthetic_transformations = {}
            self.gravity_translation = {}
            self.gravity_transformation = {}

            self.trial_imu_map[key] = {
                "acc": acc.transpose(0, 1),
            }

        self.syn_imu = {}
        self.get_syn_imu_to_real_transformations()

        self.opt_trans = {}
        self.find_optimal_syn_transformations()
        """
        gui = nimble.NimbleGUI()
        gui.serve(8080)

        transform_dict = self.opt_trans["03"]


        for t in tqdm(range(self.joint_data_map["03"]["joint_angles"].shape[1])):
            skeleton.setPositions(self.joint_data_map["03"]["joint_angles"][:, t])

            body_node_pos = skeleton.getBodyNode(self.skel_names[0]).getWorldTransform().matrix()
            imu_in_world_frame =


            gui.nativeAPI().renderBasis(pos=None, euler=)
            #gui.nativeAPI().renderArrow(start_pos, (gravity_direction / 10), 0.01, 0.04)

            # print(gravity_direction)
            gui.nativeAPI().renderSkeleton(skeleton)
            time.sleep(1/60)
        """


    # Get estimated homogenous transformations from synthetix to real imu
    def get_syn_imu_to_real_transformations(self):
        for trial_name, trial in self.joint_data_map.items():
            joint_angles = trial["joint_angles"]
            joint_vel = trial["joint_vel"]
            joint_acc = trial["joint_acc"]

            curr_syn_acc = np.empty((self.N_MEAS,  joint_angles.shape[1]))
            curr_syn_angular_vel = np.empty((self.N_MEAS,  joint_angles.shape[1]))

            for t in range(joint_angles.shape[1]):
                self.skeleton.setPositions(joint_angles[:, t])
                self.skeleton.setVelocities(joint_vel[:, t])
                self.skeleton.setAccelerations(joint_acc[:, t])

                curr_syn_acc[:, t] = self.skeleton.getAccelerometerReadings(self.imus)
                curr_syn_angular_vel[:, t] = self.skeleton.getGyroReadings(self.imus)


            curr_syn_acc = curr_syn_acc.T.reshape(-1, 17, 3)
            curr_syn_angular_vel = curr_syn_angular_vel.T.reshape(-1, 17, 3)

            self.syn_imu[trial_name] = {"acc": curr_syn_acc, "angular_vel": curr_syn_angular_vel}


    def rotation_matrix(self, phi, theta, psi):
        """
        Generate a 3D rotation matrix given Euler angles (phi, theta, psi).
        """
        R_x_cos = np.cos(phi)
        R_x_sin = np.sin(phi)

        R_x = np.tile(np.eye(3)[np.newaxis, : , :], (17, 1, 1))
        R_x[:, 1, 1] = R_x_cos
        R_x[:, 2, 2] = R_x_cos
        R_x[:, 1, 2] = - R_x_sin
        R_x[:, 2, 1] = R_x_sin

        R_y_cos = np.cos(theta)
        R_y_sin = np.sin(theta)

        R_y = np.tile(np.eye(3)[np.newaxis, : , :], (17, 1, 1))
        R_y[:, 0, 0] = R_y_cos
        R_y[:, 2, 2] = R_y_cos
        R_y[:, 2, 0] = - R_y_sin
        R_y[:, 0, 2] = R_y_sin

        R_z_cos = np.cos(psi)
        R_z_sin = np.sin(psi)

        R_z = np.tile(np.eye(3)[np.newaxis, : , :], (17, 1, 1))
        R_z[:, 0, 0] = R_z_cos
        R_z[:, 1, 1] = R_z_cos
        R_z[:, 0, 1] = - R_z_sin
        R_x[:, 1, 0] = R_z_sin

        return R_z @ R_y @ R_x

    def get_transform(self, translation, rotation):
        ret = np.zeros((rotation.shape[0], 4, 4))

        ret[:, :3, :3] = rotation
        ret[:, 3, :3] = translation
        ret[:, 3, 3] = 1

        return ret

    def get_world_transforms(self, trial_name):
        num_time_steps = self.joint_data_map[trial_name]["joint_angles"].shape[1]
        world_transform = np.empty((num_time_steps, 17, 4, 4))


        for t in range(num_time_steps):
            self.skeleton.setPositions(self.joint_data_map[trial_name]["joint_angles"][:, t])

            for idx, body_node_name in enumerate(self.skel_names):
                world_transform[t, idx, :, :] = skeleton.getBodyNode(body_node_name).getWorldTransform().matrix()

        return world_transform



    def get_grads(self, loss, nimble_to_imu_transform, world_transforms, accs):
        # This is a^T @ (homogenous world transform) for each time step for each imu
        rotated_accs = einsum(np.sign(loss), accs, world_transforms, "seq, seq imu r, seq imu r c -> seq imu c")
        d_loss_d_imu_transform = repeat(rotated_accs, "seq imu c -> seq imu c repeat", repeat=4)

        d_loss_d_translation  = d_loss_d_imu_transform[:, :, :-1, -1]
        d_loss_d_rotation =


    def find_optimal_syn_transformations(self):
        for trial_name, syn_imu in self.syn_imu.items():
            num_t_steps = syn_imu["acc"].shape[1]

            syn_imu_data = syn_imu["acc"].transpose(2, 1)
            real_imu_data = self.trial_imu_map[trial_name]["acc"].transpose(2, 1)

            homogenous_coord = np.ones((17, 1, num_t_steps))
            syn_imu_data = np.concatenate((syn_imu_data, homogenous_coord), axis=-1)

            phi_accel = np.random.rand(17)
            theta_accel = np.random.rand(17)
            psi_accel = np.random.rand(17)

            accel_translation_mat = np.random.rand(17, 1, 3)
            nimble_to_real_imu_transform = self.get_transform(accel_translation_mat, self.rotation_matrix(phi_accel, theta_accel, psi_accel))

            trial_body_node_world_transforms = self.get_world_transforms(trial_name)

            begin_loss = None
            end_loss = None

            for iteration in range(1250):

                transformed_syn = 0

                l1_loss = torch.norm(real_imu_data - transformed_syn, p=1)

                # Combine into a single loss
                loss = l1_loss

                if iteration == 0:
                    begin_loss = loss.item()
                elif iteration == 1249:
                    end_loss = loss.item()

                # Backpropagation
                loss.backward()
                #optimizer.step()

            nimble_to_real_imu_transform = self.rotation_matrix(phi_accel, theta_accel, psi_accel)

            self.opt_trans[trial_name] = {
                "begin_loss": begin_loss,
                "end_loss": end_loss,
                "accel_rotation_mat": nimble_to_real_imu_transform,
            }


In [21]:
dip_subject = DIP_Subject(trial_path)

Setting len to 0.106608
Processing DIP Subject 1
Setting neutral pos to   0
0.7
  0
Setting len to 0.342794
Setting neutral pos to    0
-0.6
   0
Setting len to 0.119716
Setting neutral pos to   0
0.2
  0
(49, 13778)
(49, 5942)
(49, 7938)
(49, 1075)
(49, 5540)
Synthetic IMU data shape torch.Size([17, 3, 13778])


ValueError: could not broadcast input array from shape (17,1,3) into shape (17,3)

In [None]:
import pickle
with open("/home/meribejayson/Desktop/Projects/realistic-imu/data/final_dataset/DIP/DIP_orig/S1/01.pkl", "rb") as file:
    loaded_dict = pickle.load(file, encoding="latin1")

loaded_dict.keys()

In [None]:
loaded_dict['gt'].shape

In [None]:
loaded_dict["imu"].shape

In [None]:
loaded_dict["imu_acc"].shape

In [None]:
subject = nimble.biomechanics.SubjectOnDisk("/home/meribejayson/Desktop/Projects/realistic-imu/data/final_dataset/DIP/S1.b3d")
skeleton = nimble.dynamics.Skeleton = subject.readSkel(processingPass=1,
                                                       geometryFolder=GEOMETRY_PATH)

[subject.getTrialName(i) for i in range(subject.getNumTrials())]

In [None]:
"""gui = nimble.NimbleGUI()
gui.serve(8080)
trial_frames: nimble.biomechanics.FrameList = subject.readFrames(trial=0,
                                                                 startFrame=0,
                                                                 numFramesToRead=subject.getTrialLength(0),
                                                                 includeProcessingPasses=True)

joint_angles = np.vstack([frame.processingPasses[-1].pos for frame in trial_frames]).T

for t in tqdm(range(joint_angles.shape[1])):
    skeleton.setPositions(joint_angles[:, t])

    gui.nativeAPI().renderSkeleton(skeleton)
    time.sleep(1/60)"""