In [None]:
# Note: running this will require IMU data synchronized to the video

# Note: while step 2 (using HMR) requires Tensorflow 1.14 this step
# requires Tensorflow 2.0

In [None]:
%pylab notebook
import tensorflow as tf
import numpy as np
import pandas as pd
import pickle
import cv2

tfk = tf.keras
tfkl = tfk.layers

## Load a calibrated camera model

In [None]:
# Use a real calibration for the camera
cal = pickle.load(open('calibration.pkl', 'rb'))

# build the camera observation viewpoints (from a prior calibration)
global_translation = 1 * np.array([[0], [0], [0]])
camera1_intrinsic = cal['cameraMatrix1']

# "hacking" camera position from origin until we learn translation of pose
camera1_extrinsic = np.concatenate([np.eye(3), global_translation], axis=1)

camera2_intrinsic = cal['cameraMatrix2']
camera2_extrinsic = np.concatenate([cal['R'], global_translation + cal['T']], axis=1)

In [None]:
# Code from https://github.com/akanazawa/hmr/tree/master/src/tf_smpl
# Run through tf_upgrade_v2 and then with a few more tweaks to get
# running in eager mode
#
# Additional modifications to make it behave like a Tensorflow Layer

def segment_names():
    return ['hips', 'leftUpLeg', 'rightUpLeg', 'spine', 'leftLeg', 'rightLeg',
            'spine1', 'leftFoot', 'rightFoot', 'spine2', 'leftToeBase', 'rightToeBase',
            'neck', 'leftShoulder', 'rightShoulder', 'head', 'leftArm', 'rightArm',
            'leftForeArm', 'rightForeArm', 'leftHand', 'rightHand', 'leftHandIndex1',
            'rightHandIndex1']
segment_idx = dict(zip(segment_names(), range(24)))

def joint_idx(name):
    idx = segment_idx[name]
    return np.arange(3*idx, 3*idx+3)

def batch_rodrigues(theta, name=None):
    """
    Theta is N x 3
    
    Using code from tensorflow graphics to compute the transform
    matrix.
    """
    batch_size = tf.shape(theta)[0]

    # angle = tf.norm(theta, axis=1)
    # r = tf.expand_dims(tf.div(theta, tf.expand_dims(angle + 1e-8, -1)), -1)
    # angle = tf.expand_dims(tf.norm(theta, axis=1) + 1e-8, -1)
    angle = tf.norm(theta + 1e-6, axis=1, keepdims=True)
    axis = tf.divide(theta, angle)

    sin_axis = tf.sin(angle) * axis
    cos_angle = tf.cos(angle)
    cos1_axis = (1.0 - cos_angle) * axis
    _, axis_y, axis_z = tf.unstack(axis, axis=-1)
    cos1_axis_x, cos1_axis_y, _ = tf.unstack(cos1_axis, axis=-1)
    sin_axis_x, sin_axis_y, sin_axis_z = tf.unstack(sin_axis, axis=-1)
    tmp = cos1_axis_x * axis_y
    m01 = tmp - sin_axis_z
    m10 = tmp + sin_axis_z
    tmp = cos1_axis_x * axis_z
    m02 = tmp + sin_axis_y
    m20 = tmp - sin_axis_y
    tmp = cos1_axis_y * axis_z
    m12 = tmp - sin_axis_x
    m21 = tmp + sin_axis_x
    diag = cos1_axis * axis + cos_angle
    diag_x, diag_y, diag_z = tf.unstack(diag, axis=-1)
    matrix = tf.stack((diag_x, m01, m02,
                       m10, diag_y, m12,
                       m20, m21, diag_z),
                      axis=-1)  # pyformat: disable
    output_shape = tf.concat((tf.shape(input=axis)[:-1], (3, 3)), axis=-1)
    return tf.reshape(matrix, shape=output_shape)

def batch_global_rigid_transformation(Rs, Js, parent, rotate_base=False, dtype=tf.float32):
    """
    Computes absolute joint locations given pose.

    rotate_base: if True, rotates the global rotation by 90 deg in x axis.
    if False, this is the original SMPL coordinate.

    Args:
      Rs: N x 24 x 3 x 3 rotation vector of K joints
      Js: N x 24 x 3, joint locations before posing
      parent: 24 holding the parent id for each index

    Returns
      new_J : `Tensor`: N x 24 x 3 location of absolute joints
      A     : `Tensor`: N x 24 4 x 4 relative joint transformations for LBS.
    """
    
    N = tf.shape(Rs)[0]

    if rotate_base:
        print('Flipping the SMPL coordinate frame!!!!')
        rot_x = tf.constant([[1, 0, 0], [0, -1, 0], [0, 0, -1]], dtype=Rs.dtype)
        rot_x = tf.reshape(tf.tile(rot_x, [N, 1]), [N, 3, 3])
        root_rotation = tf.matmul(Rs[:, 0, :, :], rot_x)
    else:
        root_rotation = Rs[:, 0, :, :]

    # Now Js is N x 24 x 3 x 1
    Js = tf.expand_dims(Js, axis=-1)

    def make_A(R, t, name=None):
        # Rs is N x 3 x 3, ts is N x 3 x 1
        R_homo = tf.pad(tensor=R, paddings=[[0, 0], [0, 1], [0, 0]])
        t_homo = tf.concat([t, tf.ones([N, 1, 1], dtype=dtype)], axis=1)
        return tf.concat([R_homo, t_homo], axis=2)

    A0 = make_A(root_rotation, Js[:, 0])
    results = [A0]
    for i in range(1, parent.shape[0]):
        j_here = Js[:, i] - Js[:, parent[i]]
        A_here = make_A(Rs[:, i], j_here)
        res_here = results[parent[i]] @ A_here
        results.append(res_here)

    # N x 24 x 4 x 4
    results = tf.stack(results, axis=1)

    new_J = results[:, :, :3, 3]

    # --- Compute relative A: Skinning is based on
    # how much the bone moved (not the final location of the bone)
    # but (final_bone - init_bone)
    # ---
    Js_w0 = tf.concat([Js, tf.zeros([N, 24, 1, 1], dtype=dtype)], 2)
    init_bone = results @ Js_w0
    # Append empty 4 x 3:
    init_bone = tf.pad(tensor=init_bone, paddings=[[0, 0], [0, 0], [0, 0], [3, 0]])
    A = results - init_bone

    return new_J, A

# There are chumpy variables so convert them to numpy.
def undo_chumpy(x):
    return x if isinstance(x, np.ndarray) else x.r

class SMPL(tfkl.Layer):
    """ See paper for full description. Internal joint indices are
    
        0 MidHip
        1 LHip
        2 RHip
        4 LKnee
        5 RKnee
        7 LAnkle
        8 RAnkle
        12 Neck
        15 Nose
        16 LShoulder
        17 RShoulder
        18 LElbow
        19 RElbow
        20 LWrist
        21 RWrist
    """
    def __init__(self, pkl_path='neutral_smpl_with_cocoplus_reg.pkl', 
                 joint_type='lsp', **kwargs):
        """
        pkl_path is the path to a SMPL model
        """
        super(SMPL, self).__init__(**kwargs)
        
        # -- Load SMPL params --
        self.pkl_path = pkl_path
        with open(pkl_path, 'rb') as f:
            dd = pickle.load(f, encoding="latin-1")
        
        # Mean template vertices
        self.v_template = tf.constant(undo_chumpy(dd['v_template']), name='v_template', dtype=self.dtype)
        
        # Size of mesh [Number of vertices, 3]
        self.size = tf.TensorShape([tf.shape(self.v_template)[0], 3])
        self.num_betas = dd['shapedirs'].shape[-1]
        # Shape blend shape basis: 6980 x 3 x 10
        # reshaped to 6980*30 x 10, transposed to 10x6980*3
        shapedir = np.reshape(undo_chumpy(dd['shapedirs']), [-1, self.num_betas]).T
        self.shapedirs = tf.constant(shapedir, name='shapedirs', dtype=self.dtype)

        # Regressor for joint locations given shape - 6890 x 24
        self.J_regressor = tf.constant(dd['J_regressor'].T.todense(), name="J_regressor", dtype=self.dtype)

        # Pose blend shape basis: 6890 x 3 x 207, reshaped to 6890*30 x 207
        num_pose_basis = dd['posedirs'].shape[-1]
        # 207 x 20670
        posedirs = np.reshape(undo_chumpy(dd['posedirs']), [-1, num_pose_basis]).T
        self.posedirs = tf.constant(posedirs, name='posedirs', dtype=self.dtype)

        # indices of parents for each joints
        self.parents = dd['kintree_table'][0].astype(np.int32)

        # LBS weights
        self.lbs_weights = tf.constant(undo_chumpy(dd['weights']), name='lbs_weights', dtype=self.dtype)
        
        self.joint_type = joint_type
        self.raw_joint_indices = tf.constant([8, 5, 2, 1, 4, 7, 21, 19, 17, 16, 18, 20, 12, 15], dtype=tf.int32)

        # This returns 19 keypoints: 6890 x 19
        self.joint_regressor = tf.constant(dd['cocoplus_regressor'].T.todense(), name="cocoplus_regressor", dtype=self.dtype)
        if joint_type == 'lsp':  # 14 LSP joints!
            self.joint_regressor = self.joint_regressor[:, :14]

    def call(self, beta, theta):
        """
        Obtain SMPL with shape (beta) & pose (theta) inputs.
        Theta includes the global rotation.
        Args:
          beta: N x 10
          theta: N x 72 (with 3-D axis-angle rep)

        Updates:
          self.J_transformed: N x 24 x 3 joint location after shaping & posing with beta and theta
          self.verts - N x 6980 x 3 vertices of body shell
        Returns:
          Joints: N x 19 or 14 x 3 joint locations depending on joint_type
          Rs: N x 24 x 3 x 3 rotation matrices for each of the body segment reference frames. Note 
              that these are rotations relative to the neutral posture and not absolute segment
              orientations.
        """

        beta = tf.cast(tf.convert_to_tensor(beta), dtype=self.dtype)
        theta = tf.cast(tf.convert_to_tensor(theta), dtype=self.dtype)
        
        num_batch = tf.shape(theta)[0]
        num_joints = 24
        
        # 1. Add shape blend shapes
        # (N x 10) x (10 x 6890*3) = N x 6890 x 3
        v_shaped = tf.reshape(tf.reshape(beta, [1, 10]) @ self.shapedirs, [-1, self.size[0], self.size[1]]) + self.v_template

        # 2. Infer shape-dependent joint locations.
        Jx = v_shaped[:, :, 0] @ self.J_regressor
        Jy = v_shaped[:, :, 1] @ self.J_regressor
        Jz = v_shaped[:, :, 2] @ self.J_regressor
        J = tf.tile(tf.stack([Jx, Jy, Jz], axis=2), [num_batch, 1, 1])
        
        # 3. Add pose blend shapes
        # N x 24 x 3 x 3
        Rs = tf.reshape(batch_rodrigues(tf.reshape(theta, [-1, 3])), [-1, num_joints, 3, 3])

        #4. Get the global joint location
        self.J_transformed, A = batch_global_rigid_transformation(Rs, J, self.parents, dtype=self.dtype)
        if self.joint_type == 'raw':
            return tf.gather(self.J_transformed, self.raw_joint_indices, axis=1)

        # 5. Do skinning:
        # Ignore global rotation.
        pose_feature = tf.reshape(Rs[:, 1:, :, :] - tf.eye(3, dtype=self.dtype), [-1, 207])
        
        # overwrite Rs with what we actually want to return -- the absolutely (change) of reference
        # frame for each segment
        Rs = A[:,:,:3,:3]

        # (N x 207) x (207, 20670) -> N x 6890 x 3
        v_posed = tf.reshape(pose_feature @ self.posedirs, [-1, self.size[0], self.size[1]]) + v_shaped

        # W is N x 6890 x 24
        W = tf.reshape(tf.tile(self.lbs_weights, [num_batch, 1]), [num_batch, -1, num_joints])
        # (N x 6890 x 24) x (N x 24 x 16)
        A = tf.reshape(A, [num_batch, num_joints, 16])
        T = tf.reshape(W @ A, [num_batch, -1, 4, 4])
        v_posed_homo = tf.pad(v_posed, [[0,0], [0,0], [0,1]], 'CONSTANT', tf.constant(1.0, dtype=self.dtype))
        
        v_homo = T @ tf.expand_dims(v_posed_homo, axis=-1)
    
        verts = v_homo[:, :, :3, 0]

        # Get cocoplus or lsp joints:
        joint_x = tf.matmul(verts[:, :, 0], self.joint_regressor)
        joint_y = tf.matmul(verts[:, :, 1], self.joint_regressor)
        joint_z = tf.matmul(verts[:, :, 2], self.joint_regressor)
        joints = tf.stack([joint_x, joint_y, joint_z], axis=2)
        self.verts = verts
        
        # restore shape of A
        A = tf.reshape(A, [num_batch, num_joints, 4, 4])
        return joints, Rs

    def get_config(self):
        base_config = super(SMPL, self).get_config()
        base_config.update({'pkl_path': self.pkl_path, 'joint_type': self.joint_type})
        return base_config
    
frame_id = 90
frames = 100

# for some reason this model isn't stable with large batches when using
# float32. This comes when computing v_homo. For some reason even having
# identical imputs for each batch element massively blows up in a way that
# isn't clearly from a precision issue
smpl = SMPL('neutral_smpl_with_cocoplus_reg.pkl', joint_type='lsp', dtype=tf.float32)
coordinates_3d, Rs = smpl(np.zeros((10,), dtype=np.float32), np.zeros((frames,72), dtype=np.float32)) #pose(x)[1]) #bs(x)[1]*0
display(np.around(coordinates_3d.numpy()[frame_id], decimals=3))

#smpl = SMPL('neutral_smpl_with_cocoplus_reg.pkl', joint_type='raw', dtype=tf.float64)
# raw: MidHip, LHip, RHip, LKnee, RKnee, LAnkle, RAnkle, Neck, Nose, LShoulder, RShoulder, LElbow, RElbow, LWrist, RWrist
# smpl.raw_joint_indices = [0, 1, 2, 4, 5, 7, 8, 12, 15, 16, 17, 18, 19, 20, 21]
#coordinates_raw = smpl(np.zeros((10,), dtype=np.float32), np.zeros((frames,72), dtype=np.float32)) #pose(x)[1]) #bs(x)[1]*0

smpl.get_config()

In [None]:
# simple fuctional to translate and scale pose

def origin_wrapper(smpl, scale, beta, origin, theta):
    """
    Args:
     origin: N x 3
     scale:  1
     beta:   10
     theta:  N x 72

    Returns: N x (19 or 14) x 3 joint positions
    """

    joints_centered, Rs = smpl(beta, theta)
    origin = tf.expand_dims(origin, axis=1)
    return scale * joints_centered + origin, Rs
    
origin_smpl = lambda scale, beta, center, theta: origin_wrapper(smpl, scale, beta, center, theta)

frames = 100
beta = np.zeros((10,), dtype=np.float32)
theta = np.zeros((frames,72), dtype=np.float32)
center = np.zeros((frames,3), dtype=np.float32)
scale = 100.0
joints, Rs = origin_smpl(scale, beta, center, theta)

# Measurement and Pose model

The above code establishes the forward kinematics for the transformations. This develops the layers that stores the pose state (essentially a mapping from frame number to a pose that can be backpropagated to) and the layers that project the 
$$\mathtt{pose} \rightarrow \mathtt{coordinate frames} \rightarrow \mathtt{measurements}$$.

In [None]:
def circular(var):
    return tf.clip_by_value(var, -np.pi/2, np.pi/2)

class SmoothL2Regularizer(tf.keras.regularizers.L1L2):

    def __init__(self, smooth_fraction=0.99, **kwargs):
        super(SmoothL2Regularizer, self).__init__(**kwargs)
        self.smooth_fraction = smooth_fraction

    def __call__(self, x):
        dx = x[1:] - x[:-1]
        
        l_dx = super(SmoothL2Regularizer, self).__call__(dx)
        l_x = super(SmoothL2Regularizer, self).__call__(x)
        return l_dx * self.smooth_fraction + l_x * (1 - self.smooth_fraction)
    
    def get_config(self):
        base_config = super(SmoothL2Regularizer, self).get_config()
        base_config.update({'smooth_fraction': self.smooth_fraction})
        return base_config

class Pose(tfkl.Layer):
    """ A learnable "Pose State" layer
    
        As input takes a vector of time steps and returns the pose at each time
        based on the learned model. In this simple implementation it is just a 
        lookup table and anticipates integer time values
    """
    
    def __init__(self, N=100, M=72, smoothness=1, pose_l2=1, **kwargs):
        """ """
        super(Pose, self).__init__(**kwargs)

        self.smoothness = smoothness
        self.pose_l2 = pose_l2
        self.pose = self.add_weight(shape=(N,M), trainable=True, dtype=tf.float32,
                                    regularizer=SmoothL2Regularizer(smooth_fraction=smoothness, l2=pose_l2),
                                    initializer=tf.initializers.zeros(), name='pose_estimate')
        self.location = self.add_weight(shape=(N,3), trainable=True, dtype=tf.float32, 
                                        regularizer=SmoothL2Regularizer(smooth_fraction=1.0, l2=pose_l2),
                                        initializer=tf.initializers.zeros(), name='location_estimate')

    def call(self, time):
         # elbow has one axis and should only go one way (note the sign for left arm versus right)
        bad_anatomy = tf.reduce_sum(tf.square(self.pose[:, 3 * segment_idx['rightForeArm'] + 2])) + \
                      tf.reduce_sum(tf.square(self.pose[:, 3 * segment_idx['leftForeArm'] + 2])) +  \
                      tf.reduce_sum(tf.nn.relu(-self.pose[:, 3 * segment_idx['rightForeArm'] + 1])) + \
                      tf.reduce_sum(tf.nn.relu(self.pose[:, 3 * segment_idx['leftForeArm'] + 1]))
        self.add_loss(1e4 * bad_anatomy)
        return tf.gather(self.location, time), tf.gather(self.pose, time)

    def get_config(self):
        base_config = super(Pose, self).get_config()
        base_config.update({'N': self.pose.shape[0], 'M': self.pose.shape[1],
                            'smoothness': self.smoothness, 'pose_l2': self.pose_l2})
        return base_config
    
class BodyShape(tfkl.Layer):
    """ A learnable body shape representation
    """
    
    def __init__(self, initial_shape=None, **kwargs):
        super(BodyShape, self).__init__(**kwargs)
        
        if initial_shape is None:
            initializer = tf.initializers.Constant(np.zeros((10,), dtype=np.float32))
        else:
            initializer = tf.initializers.Constant(initial_shape)
        self.body_shape = self.add_weight(name='body_shape', shape=[10], dtype=tf.float32, initializer=initializer, trainable=True)
        self.scale = self.add_weight(name='scale', shape=[], dtype=tf.float32, initializer=tf.initializers.Constant(1.0))
        
    def call(self, time):        
        batch_size = tf.shape(time)[0]
        return tf.nn.elu(self.scale), self.body_shape
    
    def get_config(self):
        return super(BodyShape, self).get_config()
    
def get_rotation_matrix_from_quaternion(quaternion):
    # from Tensorflow Graphics but able to map to trainable variables
    x, y, z, w = tf.unstack(quaternion, axis=-1)
    tx = 2.0 * x
    ty = 2.0 * y
    tz = 2.0 * z
    twx = tx * w
    twy = ty * w
    twz = tz * w
    txx = tx * x
    txy = ty * x
    txz = tz * x
    tyy = ty * y
    tyz = tz * y
    tzz = tz * z
    matrix = tf.stack((1.0 - (tyy + tzz), txy - twz, txz + twy,
                       txy + twz, 1.0 - (txx + tzz), tyz - twx,
                       txz - twy, tyz + twx, 1.0 - (txx + tyy)),
                      axis=-1)  # pyformat: disable
    output_shape = tf.concat((tf.shape(input=quaternion)[:-1], (3, 3)), axis=-1)
    return tf.reshape(matrix, shape=output_shape)

class QuaternionNormalization(tfkl.Layer):
    """ Layer to renormalize a renormalize a unit quaternion
    
        Excepts to operate on data of size [..., 4] for any batch
        size, simply normalizes on the last dimension. Does not
        actually assert the last dimension is 4 long.
    """
    
    def __init__(self, norm_err_penalty=1e-3, **kwargs):
        super(QuaternionNormalization, self).__init__(**kwargs)
        self.norm_err_penalty = norm_err_penalty
        self.epsilon = 1e-5
        
    def call(self, inputs):
        
        # weakly penalize deviations from norm length prior to renomalization. Per
        # Pavlo et al., 2019 this improves training
        norm = tf.reduce_sum(tf.square(inputs), axis=-1, keepdims=True)
        norm_err = self.norm_err_penalty * tf.reduce_sum(tf.square(1 - norm))
        self.add_loss(norm_err)
        
        return inputs / (tf.sqrt(norm) + self.epsilon)
    
    def get_config(self):
        base_config = super(QuaternionNormalization, self).get_config()
        base_config.update({'norm_err_penalty': self.norm_err_penalty})
        return base_config
    
class TrainableRotation(tfkl.Layer):
    def __init__(self, N, **kwargs):
        super(TrainableRotation, self).__init__(**kwargs)
        self.quaternion = tf.Variable(tf.tile(tf.constant([[1.0, 0.0, 0.0, 0.0]], dtype=tf.float32) , [N, 1]),
                                      trainable=True, name='quaternion_angles')
        self.quaternion_normalization = QuaternionNormalization()
        
    def call(self, inputs):
        normalized_quaternion = self.quaternion_normalization(self.quaternion)
        return get_rotation_matrix_from_quaternion(normalized_quaternion)
    
    def get_config(self):
        base_config = super(TrainableRotation, self).get_config()
        base_config.update({'N': self.quaternion.shape[0]})
        return base_config    
    
class PredictedImuMeasurements(tfkl.Layer):
    """ Compute expected IMU rotations based on pose """
    def __init__(self, imu_frames, **kwargs):
        super(PredictedImuMeasurements, self).__init__(**kwargs)
        
        self.imu_frames = tf.constant(imu_frames, dtype=tf.bool)
        
        # Initialize with a identity matrix (no rotation). Need to constrain
        # degrees of freedom while training. Attempted to use the TFG rotation
        # layers but could not build up a trainable rotation parameterization.
        N = np.sum(imu_frames)
        self.Rib = TrainableRotation(N, name='Rig')
        self.Rig = TrainableRotation(N, name='Rig')
    
    def call(self, predicted_rotations):        
        # Only keep the rotation frames we are measuring
        predicted_rotations = tf.boolean_mask(predicted_rotations, self.imu_frames, axis=1)

        # And apply our current observation transformation estimate
        return self.Rig(None) @ predicted_rotations @ self.Rib(None)
    
    def get_config(self):
        base_config = super(PredictedImuMeasurements, self).get_config()
        base_config.update({'imu_frames': self.imu_frames})
        return base_config
    
class PredictedImagePositions(tfkl.Layer):
    """ Compute expected image positions rotations based on pose """
    def __init__(self, intrinsic_camera, extrinsic_camera, **kwargs):
        super(PredictedImagePositions, self).__init__(**kwargs)
        
        self.intrinsic = tf.convert_to_tensor(intrinsic_camera, dtype=tf.float32)
        self.extrinsic = tf.convert_to_tensor(extrinsic_camera, dtype=tf.float32)
        self.camera = self.intrinsic @ self.extrinsic
        #self.joint_frames = tf.constant(joint_frames, dtype=tf.bool)
        
    def call(self, predicted_endpoints):
        endpoints_homogeneous = tf.pad(predicted_endpoints, tf.constant([[0, 0], [0, 0], [0, 1]]), 'CONSTANT', tf.constant(1.0))
        
        # Only keep the rotation frames we are measuring
        #endpoints_homogeneous = tf.boolean_mask(endpoints_homogeneous, self.joint_frames, axis=1)
        predicted_image_homogeneous = tf.tensordot(endpoints_homogeneous, self.camera, (-1, -1))
        
        predicted_uv = predicted_image_homogeneous[..., :-1] / predicted_image_homogeneous[..., -1, None]
        return predicted_uv

    def get_config(self):
        base_config = super(PredictedImagePositions, self).get_config()
        base_config.update({'intrinsic': self.intrinsic, 'extrinsic': self.extrinsic})
        return base_config
    
def angular_err(y_true, y_pred):
    from tensorflow.python.framework import ops
    from tensorflow.python.keras import backend as K
    from tensorflow.python.ops import math_ops

    y_pred = ops.convert_to_tensor(y_pred)
    y_true = math_ops.cast(y_true, y_pred.dtype)
    
    angle_difference = tf.linalg.matrix_transpose(y_true) @ y_pred
    difference_trace = tf.linalg.trace(angle_difference)
    angle_loss = tf.reduce_mean(tf.math.acos((difference_trace - 1) / 2.0 * (1-1e-8)))
    return angle_loss

def euclidean_err(y_true, y_pred):
    delta = y_true - y_pred
    dist = tf.reduce_sum(tf.square(y_true - y_pred), axis=-1)
    return tf.reduce_mean(dist)

#angular_err(R.from_euler('xyz', [0, 0, 45], degrees=True).as_dcm(), 
#            R.from_euler('xyz', [0, 40, 0], degrees=True).as_dcm())

In [None]:
# Tools for working with 3D projections

def stereo_triangulate(keypoints):
    parts = keypoints.columns.get_level_values(1).unique()
    p3d = []

    smoothed = keypoints.ewm(10).mean()

    for p in parts:
        p1 = smoothed['camL'][p][['x', 'y']].values
        p2 = smoothed['camR'][p][['x', 'y']].values

        P1 = camera1_intrinsic @ camera1_extrinsic #(cal['cameraMatrix1'] @ np.concatenate((np.eye(3), np.zeros((3,1))), axis=1))
        P2 = camera2_intrinsic @ camera2_extrinsic #(cal['cameraMatrix1'] @ np.concatenate((cal['R'], cal['T']), axis=1))

        # compute 3d homogeneous
        p3 = cv2.triangulatePoints(P1, P2, p1.transpose(), p2.transpose()).transpose()
        p3 = cv2.convertPointsFromHomogeneous(p3)[:,0,:]
        
        p3 = p3[:, [0, 1, 2]]
        p3d.append(p3)

    p3d = np.stack(p3d, axis=1)

    columns = pd.MultiIndex.from_product([parts, ['x', 'y', 'z']], names=['part', 'component'])
    return pd.DataFrame(p3d.reshape((len(stereo_keypoints), -1)),  columns=columns, index=keypoints.index)


## Perform this with torso

In [None]:
stereo_keypoints = pd.read_pickle(f'movement.pkl')

lsp_parts = ["RAnkle", "RKnee", "RHip", "LHip", "LKnee", "LAnkle", "RWrist", "RElbow", 
             "RShoulder", "LShoulder", "LElbow", "LWrist", "Neck", "Nose"]

cams = stereo_keypoints.columns.get_level_values(0).unique()
#display(stereo_keypoints[cams[0]][parts])

measured_positions = []
cals = [{'K': cal['cameraMatrix1'], 'D': cal['distCoeffs1'], 'R': cal['R1'], 'P': cal['P1']},
       {'K': cal['cameraMatrix2'], 'D': cal['distCoeffs2'], 'R': cal['R2'], 'P': cal['P2']}]
for c, cam_cal in zip(cams, cals):
    for p in lsp_parts:
        points = stereo_keypoints[c][p][['x', 'y', 'c']]
        
        # optionally can apply additional correction for camera distortion. reprojection to
        # image will (correctly) look off after this
        if False:  
            # need to look if there is a way to factor the higher level distortions into
            # the observation model. probably not worth it.
            points = points.values.reshape((-1,1,2)).astype(np.float32)
            points = cv2.undistortPoints(points, cam_cal['K'], cam_cal['D'], R=cam_cal['R'], P=cam_cal['P'])[:,0,:]
        
        measured_positions.append(points)
        
measured_positions = np.stack(measured_positions, axis=1)

# need to reorder the axes as this code expects x, y, z, w
reorder = [1, 2, 3, 0]
imu_readings_r_forearm = pd.read_pickle(f'attitude_r_forearm{run_number}.pkl')
imu_readings_r_arm = pd.read_pickle(f'attitude_r_arm{run_number}.pkl')
from scipy.spatial.transform import Rotation as R
measured_orientations_r_forearm = R.from_quat(imu_readings_r_forearm.values[:, reorder]).as_dcm()[:, None, ...]
measured_orientations_r_arm = R.from_quat(imu_readings_r_arm.values[:, reorder]).as_dcm()[:, None, ...]

measured_orientations = np.concatenate((measured_orientations_r_arm, measured_orientations_r_forearm), axis=1)

stop = 900
stereo_keypoints = stereo_keypoints[:stop]
measured_positions = measured_positions[:stop]
measured_orientations = measured_orientations[:stop]
measured_orientations_r_forearm = measured_orientations_r_forearm[:stop]
measured_orientations_r_arm = measured_orientations_r_arm[:stop]
imu_readings_r_forearm = imu_readings_r_forearm[:stop]
imu_readings_r_arm = imu_readings_r_arm[:stop]

# when things are missing I flagged as nan. replace with no confidence
measured_positions[np.isnan(measured_positions)] = 0

#measured_positions = measured_positions[:250:5]
print(measured_positions.shape)
print(measured_orientations.shape)

In [None]:
fig, ax = plt.subplots(2,1,sharex=True)

confidence_mean = np.mean(np.stack([stereo_keypoints['camL'][p]['c'].values for p in lsp_parts], axis=1), axis=1)
confidence_good = np.logical_and(confidence_mean > 0.4, confidence_mean < 5)

valid_positions = np.all(np.all(~np.isnan(measured_positions), axis=1), axis=1)

keep = np.logical_and(confidence_good, valid_positions)

ax[0].plot(np.where(keep)[0], np.stack([stereo_keypoints['camL'][p]['c'].values for p in lsp_parts], axis=1)[keep], '.')
ax[0].plot(np.where(keep)[0], np.mean(np.stack([stereo_keypoints['camL'][p]['c'].values for p in lsp_parts], axis=1), axis=1)[keep], 'k')

ax[1].plot(np.where(keep)[0], measured_positions[keep, 0, :])

In [None]:
def build_model(N, x):
    smpl = SMPL('neutral_smpl_with_cocoplus_reg.pkl', joint_type='lsp')
    origin_smpl = lambda scale, beta, center, theta: origin_wrapper(smpl, scale, beta, center, theta)

    bs = BodyShape(np.zeros((10,)))
    pose = Pose(N, 72, pose_l2=2e-3, smoothness=0.99)

    pred_image1 = PredictedImagePositions(camera1_intrinsic, camera1_extrinsic)
    pred_image2 = PredictedImagePositions(camera2_intrinsic, camera2_extrinsic)
    pred_images = [pred_image1, pred_image2]
    
    selected_frames = np.array([False]*24)
    selected_frames[[17, 19]] = True

    pred_imu = PredictedImuMeasurements(tf.constant(selected_frames))
    
    joints, Rs = origin_smpl(*bs(x), *pose(x))
    
    # use N cameras
    im = tf.concat([pred(joints) for pred in pred_images], axis=1)
    imu = pred_imu(Rs)
    
    return im, imu, pose, bs, pred_images, pred_imu

N = len(measured_positions)
x = np.arange(N)
y = [measured_positions, measured_orientations]

#mirrored_strategy = tf.distribute.MirroredStrategy()
#with mirrored_strategy.scope():
huber = tf.keras.losses.Huber()
def position_loss(y_true, y_pred):
    weight = y_true[..., 2]
    y_true = y_true[..., :2]
    return huber(y_true, y_pred, sample_weight=weight)

if True:
    # build estimator
    model_x = tfk.Input([], dtype=tf.int32)
    im, imu, pose, bs, pred_images, pred_imu = build_model(N, model_x)
    obs_model = tfk.Model(inputs=model_x, outputs=[im, imu])
    obs_model.output_names = ['Image', 'IMU']
    learning_rate = 0.001
    obs_model.compile(tf.optimizers.Adam(learning_rate), 
                      loss=[position_loss, lambda a, b : 0.5 * angular_err(a, b)])

In [None]:
# Initialize from ballpark starting position

triangulated_coordinates = stereo_triangulate(stereo_keypoints)
torso_base = triangulated_coordinates['MidHip']
mean_position = np.median(torso_base[keep], axis=0)
print(torso_base.shape)
print(pose.location.shape)
pose.location.assign(torso_base.values)

#for idx in np.where(~keep)[0]:
#    pose.location[idx,:].assign(mean_position.astype(np.float32))
#tf.tile(tf.reshape(mean_position.astype(np.float32), [1, -1]), [sum(~keep), 1])
bs.scale.assign(100);

In [None]:
offset = 100

results = pickle.load(open(f'results{run_number}.pkl', 'rb'))
theta = np.stack([r['theta'] for r in results[0]], axis=0)[offset:offset+stop]
#theta[:,0] = theta[:,0] + np.pi/2
beta = np.stack([r['beta'] for r in results[0]], axis=0)[offset:offset+stop]

mean_beta = np.mean(beta, axis=0)
bs.body_shape.assign(mean_beta)
pose.pose.assign(theta);

In [None]:
resulting_mid_hip = np.mean(origin_smpl(*bs(x), *pose(x))[0][:,2:4,:], axis=1)

plt.figure()
plt.plot(resulting_mid_hip)
plt.plot(torso_base.values)

#offset = np.mean(torso_base.values[:900]-resulting_mid_hip[:900], axis=0, keepdims=True)
offset = np.mean(torso_base.values - resulting_mid_hip, axis=0, keepdims=True)
pose.location.assign(pose.location + offset)

print(f'Corrected offset: {offset}')

In [None]:
fast_epochs = 5
fast_speed = 0.05
lr_decay_cb = tf.keras.callbacks.LearningRateScheduler(
   lambda epoch: fast_speed if epoch < fast_epochs else learning_rate + (fast_speed - learning_rate) * (0.99 ** (epoch-fast_epochs)),
   verbose=False)

es_cb = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=50)

batch_size = len(x) #int(len(x) / 10)
history = obs_model.fit(x, y, epochs=10000, verbose=1, batch_size=batch_size, callbacks=[es_cb]) #, lr_decay_cb]) #,  lr_decay_cb

In [None]:
# Compute residual error
y_out = obs_model(x)

t_offset = stereo_keypoints.index.copy()
t_offset = t_offset - t_offset[0]
t_offset = t_offset.total_seconds()

In [None]:
fig, ax = plt.subplots(2,1,figsize=(7,7))
sca(ax[0])
reprojection_error = np.linalg.norm(y[0][:, :, :2] - y_out[0].numpy(), axis=2)
plt.plot(t_offset, reprojection_error, '.', MarkerSize=1)
plt.plot(t_offset, np.mean(reprojection_error, axis=1), 'k.')
plt.ylabel('Reprojection error (2D pixels)')
plt.ylim(0,50)

sca(ax[1])

#angular_err(y[1], y_out[1])
median_angular_error = []
for imu_idx in range(2):
    rotated_imu = R.from_dcm(y[1][:, imu_idx, ...])
    body_segment = R.from_dcm(y_out[1][:, imu_idx, ...])
    rotation_err = rotated_imu * body_segment.inv()
    rotation_err = np.linalg.norm(rotation_err.as_rotvec(), axis=1)

    median_angular_error.append(np.median(rotation_err) * 180 / np.pi)
    plt.plot(t_offset, rotation_err * 180 / np.pi)
plt.ylim(0, 90)
plt.xlabel('Time (s)')
plt.ylabel('Rotation error (deg)')


print(f'2D reprojection error: {np.median(reprojection_error)} pixel')
print(f'IMU angular error: {median_angular_error} deg')


#y[0][:, :, :2].shape
y_out[0].shape

## Refit now only using IMUs

Use the IMU calibration from the prior run and the base pose and location (since we don't track torso current) 

In [None]:
class RestrictedPose(tfkl.Layer):
    """ A learnable "Pose State" layer
    
        As input takes a vector of time steps and returns the pose at each time
        based on the learned model. In this simple implementation it is just a 
        lookup table and anticipates integer time values
    """
    
    def __init__(self, N=100, M=72, smoothness=1, pose_l2=1, **kwargs):
        """ """
        super(RestrictedPose, self).__init__(**kwargs)

        self.smoothness = smoothness
        self.pose_l2 = pose_l2
        self.pose = self.add_weight(shape=(N,M), trainable=True, dtype=tf.float32,
                                    regularizer=SmoothL2Regularizer(smooth_fraction=smoothness, l2=pose_l2),
                                    initializer=tf.initializers.zeros(), name='pose_estimate')
        self.location = self.add_weight(shape=(N,3), trainable=True, dtype=tf.float32, 
                                        regularizer=SmoothL2Regularizer(smooth_fraction=1.0, l2=pose_l2),
                                        initializer=tf.initializers.zeros(), name='location_estimate')

    def call(self, time):
         # elbow has one axis and should only go one way (note the sign for left arm versus right)
        bad_anatomy = tf.reduce_sum(tf.square(self.pose[:, 3 * segment_idx['rightForeArm'] + 2])) + \
                      tf.reduce_sum(tf.square(self.pose[:, 3 * segment_idx['leftForeArm'] + 2])) +  \
                      tf.reduce_sum(tf.nn.relu(-self.pose[:, 3 * segment_idx['rightForeArm'] + 1])) + \
                      tf.reduce_sum(tf.nn.relu(self.pose[:, 3 * segment_idx['leftForeArm'] + 1]))
        self.add_loss(1e4 * bad_anatomy)
        
        stiff_body_joints = self.pose[:, 1:segment_idx['rightShoulder']*3]
        self.add_loss(tf.reduce_sum(tf.square(stiff_body_joints)))
        
        return tf.gather(self.location, time), tf.gather(self.pose, time)

    def get_config(self):
        base_config = super(Pose, self).get_config()
        base_config.update({'N': self.pose.shape[0], 'M': self.pose.shape[1],
                            'smoothness': self.smoothness, 'pose_l2': self.pose_l2})
        return base_config


def build_imu_pose_model(N, x, bs, pred_imu):
    smpl = SMPL('neutral_smpl_with_cocoplus_reg.pkl', joint_type='lsp')
    origin_smpl = lambda scale, beta, center, theta: origin_wrapper(smpl, scale, beta, center, theta)

    pose = RestrictedPose(N, 72, pose_l2=1e-4)
    
    joints, Rs = origin_smpl(*bs(x), *pose(x))
    imu = pred_imu(Rs)
    
    return imu, pose

pred_imu.trainable = False
bs.trainable = False

imu2, pose2 = build_imu_pose_model(N, model_x, bs, pred_imu)

pose2.pose[:,:3].assign(pose.pose[:,:3])
pose2.location.assign(pose.location)

obs_model2 = tfk.Model(inputs=model_x, outputs=imu2)
learning_rate = 0.001

fast_epochs = 0
fast_speed = 0.01
lr_decay_cb = tf.keras.callbacks.LearningRateScheduler(
   lambda epoch: fast_speed if epoch < fast_epochs else learning_rate + (fast_speed - learning_rate) * (0.9 ** (epoch-fast_epochs)),
   verbose=False)


obs_model2.compile(tf.optimizers.Adam(learning_rate), loss=angular_err)
history2 = obs_model2.fit(x, y[1], epochs=25000, verbose=1, batch_size=batch_size, callbacks=[es_cb,  lr_decay_cb])

In [None]:
print(history.history.keys())
plt.figure()
#plt.semilogy(history.history['loss'])
plt.semilogy(history.history['loss'])
plt.semilogy(history.history['Image_loss'])
plt.title(f"Final loss {history.history['loss'][-1]} and Eucllidean: {history.history['Image_loss'][-1]}")

### Visualize the quality of position reconstructions

In [None]:
selected_pose = pose
#selected_pose = pose2

In [None]:
image_positions = pred_images[0](origin_smpl(*bs(x), *selected_pose(x))[0]).numpy()

fig, ax = plt.subplots(3,1,sharex=True)
ax[0].plot(image_positions[:,0,:], 'blue', label='reconstruction')
ax[0].plot(y[0][:,0,:], 'green', label='image keypoints')
ax[0].set_title('Waist')

ax[1].plot(image_positions[:,1,:], 'blue', label='reconstruction')
ax[1].plot(y[0][:,1,:], 'green', label='image keypoints')
ax[1].set_title('Neck')

ax[2].plot(image_positions[:,4,:], 'blue', label='reconstruction')
ax[2].plot(y[0][:,4,:], 'green', label='image keypoints')
ax[2].set_title('RWrist')

ax[0].legend().set_visible(True)

In [None]:
_, rotations = origin_smpl(*bs(x), *selected_pose(x))

predicted_rotations = pred_imu(rotations)

from scipy.spatial.transform import Rotation as R
predicted_q_arm = R.from_dcm(predicted_rotations[:,0,...])

measured_arm_from_dcm = R.from_dcm(measured_orientations_r_arm[:,0,...])
measured_forearm_from_dcm = R.from_dcm(measured_orientations_r_forearm[:,0,...])

fig, ax = plt.subplots(3, 2, sharex=True, figsize=(10,8))
ax[0,0].plot(predicted_q_arm.as_quat())
ax[0,0].set_title('Predicted (kinematics)')
ax[1,0].plot(measured_arm_from_dcm.as_quat())
#ax[1,0].plot(predicted_q_arm.as_quat())
ax[1,0].set_title('Sensor')

measured_q = R.from_quat(imu_readings_r_arm.values[:, reorder])
delta_q = np.linalg.norm((predicted_q_arm.inv() * measured_q).as_rotvec(), axis=1)
ax[2,0].plot(delta_q)
ax[2,0].set_ylim(0,np.pi)
ax[2,0].set_title('Angular Difference')
ax[2,0].set_ylabel('Rad')

predicted_q_forearm = R.from_dcm(predicted_rotations[:,1,...])
ax[0,1].plot(predicted_q_forearm.as_quat())
ax[0,1].set_title('Predicted (kinematics)')
ax[1,1].plot(measured_forearm_from_dcm.as_quat())
ax[1,1].set_title('Sensor')

measured_q = R.from_quat(imu_readings_r_forearm.values[:, reorder])
delta_q = np.linalg.norm((predicted_q_forearm.inv() * measured_q).as_rotvec(), axis=1)
ax[2,1].plot(delta_q)
ax[2,1].set_ylim(0,np.pi)
ax[2,1].set_title('Angular Difference')
ax[2,1].set_ylabel('Rad')

## Convert to pertinent joint angles


In [None]:
def get_joints(named_rotations):
    def get_r_shoulder():
        """ Recover biomechanical movements from arm versus sternum orientation

            X axis is flexion
            Y axis is IR
            Z axis is abduction
        """
        r_shoulder = named_rotations['neck'].inv() * named_rotations['rightArm']

        r_shoulder_zero = R.from_euler('XYZ', [0, 0, 90], degrees=True)
        
        # this returns the plane of rotation as the first parameter, the elevation
        # as the second and finally internal rotaton (after adding to the plane of
        # rotation second). The offset is because the first rotation is really an
        # extrinsic one so we have yXY but this library won't decompose that. Also
        # zero for the plane of rotation comes out as backward extensions so adjust
        # so that zero is lateral
        r_shoulder = (r_shoulder * r_shoulder_zero.inv()).as_euler('YXY', degrees=True)
        r_shoulder = [r_shoulder[0] - 90, r_shoulder[1], r_shoulder[2] + r_shoulder[0]]
        #print(r_shoulder)
        return dict(zip(['RShoulderPlane', 'RShoulderElevation', 'RShoulderIR'], r_shoulder))
    
    def get_r_elbow_flexion():
        """ Recover elbow flexion as forearm versus arm

            X axis is flexion
            Y axis is IR but is passed on since we need to account for wrist
            Z painful
        """
        r_elbow = named_rotations['rightArm'].inv() * named_rotations['rightForeArm'] 
        signs = np.array([1, 1, 1])
        r_elbow = r_elbow.as_euler('YXZ', degrees=True) * signs
        #print(r_elbow)
        return {'RElbowFlexion': r_elbow[0]}

    def get_r_elbow_supination():
        """ Recover forearm supination as axial rotation of hand versus arm

            X axis is flexion
            Y axis is IR but is passed on since we need to account for wrist
            Z painful
        """
        r_elbow_wrist = named_rotations['rightArm'].inv() * named_rotations['rightHand'] 
        signs = np.array([1, 1, 1])
        r_elbow_wrist = r_elbow_wrist.as_euler('yzx', degrees=True) * signs
        #print(r_elbow_wrist)
        return {'RForearmSupination': r_elbow_wrist[-1]}

    joints = dict()
    joints.update(get_r_shoulder())
    joints.update(get_r_elbow_flexion())
    joints.update(get_r_elbow_supination())
    return joints

joints = [get_joints(dict(zip(segment_names(), R.from_dcm(time_Rs)))) for time_Rs in Rs]

In [None]:
df_joints = pd.DataFrame(joints, index=t_offset)
order = [4, 2, 3, 0, 1]
df_joints = df_joints[df_joints.columns[order]]
better_names = ['Shoulder Plane', 'Elevation', 'Internal Rotation', 'Elbow Flexion', 'Supination']
df_joints = pd.DataFrame(np.degrees(np.unwrap(np.radians(df_joints.values), axis=0)), columns=better_names, index=df_joints.index)
ax = df_joints.plot(subplots=True, figsize=(10,6))
for a in ax:
    a.set_ylabel('Degrees')
    a.legend().set_frame_on(False)
plt.xlabel('Time (s)')

fig = gcf()
fig.savefig('kinematic_traces.png', transparent=True, bbox_inches='tight')

### Compare video and IMU fusion

In [None]:
fig, ax = plt.subplots(3,1,figsize=(10,6), sharex=True)

Rs = smpl(np.zeros((1,10)), pose.pose)[1]
joints = [get_joints(dict(zip(segment_names(), R.from_dcm(time_Rs)))) for time_Rs in Rs]
df_joints = pd.DataFrame(joints, index=stereo_keypoints.index)

df_joints['RShoulderElevation'].plot(ax=ax[0], label='Video')
#ax[0].set_ylim(0, 180)
ax[0].set_title('R Shoulder Elevation')

df_joints['RElbowFlexion'].plot(ax=ax[1], label='Video')
#ax[0].set_ylim(0, 180)
ax[1].set_title('R Elbow Flexion')

df_joints['RForearmSupination'].plot(ax=ax[2], label='Video')
#ax[0,1].set_ylim(0, 180)
ax[2].set_title('R Forearm Supination')

Rs = smpl(np.zeros((1,10)), pose2.pose)[1]
joints = [get_joints(dict(zip(segment_names(), R.from_dcm(time_Rs)))) for time_Rs in Rs]
df_joints_imu = pd.DataFrame(joints, index=stereo_keypoints.index)

df_joints_imu['RShoulderElevation'].plot(ax=ax[0], label='IMU')
df_joints_imu['RElbowFlexion'].plot(ax=ax[1], label='IMU')
df_joints_imu['RForearmSupination'].plot(ax=ax[2], label='IMU')

ax[1].set_ylabel('Time')
ax[1].legend().set_visible(True)