# Prepare datasets

## Paths and labels

In [None]:
%load_ext autoreload
%autoreload 2

import scipy
import numpy as np
import matplotlib.pyplot as plt
from vic_controllers.plotting import multi_format_savefig, init_plt
init_plt(full_screen = False, scale = 2, use_latex=True)

import os
import sys

sub_dataset = '28-11-2023_elastic/'  # '19-12-2023_mass'

datasets_path_root = '/home/tpoignonec/dev/ws_test_vic_ros/rosbags/' + sub_dataset + '/'
export_figs_dir = 'export_figures' + '/' + sub_dataset

if not os.path.exists(export_figs_dir):
    # Create a new directory because it does not exist
    os.makedirs(export_figs_dir)
    print(f"The directory {export_figs_dir} was created!")

# Define topics
cartesian_state_topic_name = '/fd_cartesian_state'
compliant_frame_topic_name = '/fd_compliance_frame_reference'
desired_compliant_frame_topic_name = '/fd_desired_compliance_frame_GT'
ppf_node_diagnostic_topic_name = '/ppf_node_diagnostic'
simulation_time_topic_name = '/simulation_time'

topic_list = [
    cartesian_state_topic_name,
    compliant_frame_topic_name,
    desired_compliant_frame_topic_name,
    ppf_node_diagnostic_topic_name,
    simulation_time_topic_name
]

# Define datasets
label_SIPF_W4 = 'SIPF+'
label_SDPF_QP = r'SDPF, $w(t) \geq 0$'
label_SDPF_PPF = r'SDPF, $z(t) \geq 0$'
label_SDPF_PPF_adaptive = r'SDPF, $z(t) \geq z_{min}(t)$'

color_list = plt.rcParams['axes.prop_cycle'].by_key()['color']
dataset_info_list = [
    {
        'tag': 'SIPF+',
        'path_to_bag': datasets_path_root + 'SIPF+/SIPF+_0.mcap',
        'label': label_SIPF_W4,
        'color': color_list[0],
        'linestyle': '-'
    },
    {
        'tag': 'SDPF_QP',
        'path_to_bag': datasets_path_root + 'SDPF-QP/SDPF-QP_0.mcap',
        'label': label_SDPF_QP,
        'color': color_list[1],
        'linestyle': '-'
    },
    {
        'tag': 'SDPF_PPF',
        'path_to_bag': datasets_path_root + 'SDPF-PPF/SDPF-PPF_0.mcap',
        'label': label_SDPF_PPF,
        'color': color_list[2],
        'linestyle': '-'
    },
    {
        'tag': 'SDPF_PPF_adaptive',
        'path_to_bag': datasets_path_root + 'SDPF-PPF-adaptive/SDPF-PPF-adaptive_0.mcap',
        'label': label_SDPF_PPF_adaptive,
        'color': color_list[3],
        'linestyle': '--'
    }
]

## Define utils

In [None]:
# Standard msg extraction utils

def unflatten_multiarray(flattened_multiarray, dims=None):
    if (dims is None):
        rows = np.sqrt(flattened_multiarray.shape[0])
        assert (rows == int(rows))
        dims = (rows, rows)
    # TODO(tpoignonec): make sure the data is indeed row-major...
    return flattened_multiarray.reshape(dims[0], dims[1], order='C')

def extract_matrix(msg, dims=None):
    return unflatten_multiarray(np.array(msg['data'], dims))

def extract_2D_matrix(msg):
    return unflatten_multiarray(np.array(msg['data']), (2, 2))

def extract_3D_matrix(msg):
    return unflatten_multiarray(np.array(msg['data']), (3, 3))

def extract_pose_from_msg(msg):
    return np.array([
        msg['pose']['position']['x'],
        msg['pose']['position']['y'],
        msg['pose']['position']['z']
    ])

def extract_velocity_from_msg(msg):
    return np.array([
        msg['twist']['linear']['x'],
        msg['twist']['linear']['y'],
        msg['twist']['linear']['z']
    ])

def extract_acc_from_msg(msg):
    # Using twist msg under the hood
    return extract_velocity_from_msg(msg)

def extract_wrench_from_msg(msg):
    return np.array([
        msg['wrench']['force']['x'],
        msg['wrench']['force']['y'],
        msg['wrench']['force']['z']
    ])

In [None]:
def get_simulation_time(rosbag_raw_data, topic_name=None):
    data_dict = {
        'ros_time': [],
        'time': []
    }
    for msg_data in rosbag_raw_data:
        if (msg_data['topic'] == topic_name):
            # Retrieve ros timestamp
            data_dict['ros_time'] += [msg_data['time_ns']]
            data_dict['time'] += [msg_data['data']]

    data_dict['ros_time'] = np.array(data_dict['ros_time'])
    data_dict['time'] = np.array(data_dict['time'])
    return data_dict

def get_cartesian_state(rosbag_raw_data, topic_name=None):
    data_dict = {
        'ros_time': [],
        'position': [],
        'velocity': [],
        'acceleration': [],
        'wrench': [],
        'natural_inertia': []
    }
    for msg_data in rosbag_raw_data:
        if (msg_data['topic'] == topic_name):
            # Retrieve ros timestamp
            data_dict['ros_time'] += [msg_data['time_ns']]
            # Retrieve the actual data
            data_dict['position'] += [extract_pose_from_msg(msg_data['pose'])]
            data_dict['velocity'] += [extract_velocity_from_msg(msg_data['velocity'])]
            data_dict['acceleration'] += [extract_acc_from_msg(msg_data['acceleration'])]
            data_dict['wrench'] += [extract_wrench_from_msg(msg_data['wrench'])]
            data_dict['natural_inertia'] += [extract_3D_matrix(msg_data['natural_inertia'])]

    data_dict['ros_time'] = np.array(data_dict['ros_time'])
    data_dict['position'] = np.array(data_dict['position'])
    data_dict['velocity'] = np.array(data_dict['velocity'])
    data_dict['acceleration'] = np.array(data_dict['acceleration'])
    data_dict['wrench'] = np.array(data_dict['wrench'])
    data_dict['natural_inertia'] = np.array(data_dict['natural_inertia'])

    return data_dict

def get_compliance_state(rosbag_raw_data, topic_name=None):
    data_dict = {
        'ros_time': [],
        'position': [],
        'velocity': [],
        'acceleration': [],
        'stiffness': [],
        'damping': [],
        'inertia': []
    }
    for msg_data in rosbag_raw_data:
        if (msg_data['topic'] == topic_name):
            # Retrieve ros timestamp
            data_dict['ros_time'] += [msg_data['time_ns']]
            # Retrieve the actual data
            data_dict['position'] += [extract_pose_from_msg(msg_data['pose'])]
            data_dict['velocity'] += [extract_velocity_from_msg(msg_data['velocity'])]
            data_dict['acceleration'] += [extract_acc_from_msg(msg_data['acceleration'])]
            data_dict['stiffness'] += [extract_3D_matrix(msg_data['stiffness'])]
            data_dict['damping'] += [extract_3D_matrix(msg_data['damping'])]
            data_dict['inertia'] += [extract_3D_matrix(msg_data['inertia'])]

    # Consolidate data
    data_dict['ros_time'] = np.array(data_dict['ros_time'])
    data_dict['position'] = np.array(data_dict['position'])
    data_dict['velocity'] = np.array(data_dict['velocity'])
    data_dict['acceleration'] = np.array(data_dict['acceleration'])
    data_dict['stiffness'] += np.array(data_dict['stiffness'])
    data_dict['damping'] += np.array(data_dict['damping'])
    data_dict['inertia'] += np.array(data_dict['inertia'])

    return data_dict

def get_diagnostic_data(rosbag_raw_data, topic_name=None):
    data_dict = {
        'ros_time': [],
    }
    is_first_itr = True
    for msg_data in rosbag_raw_data:
        if (msg_data['topic'] == topic_name):
            # Retrieve ros timestamp
            data_dict['ros_time'] += [msg_data['time_ns']]
            # Retrieve the actual data
            if is_first_itr:
                for value_, key_ in zip(msg_data['values'], msg_data['keys']):
                    data_dict[key_] = [value_]
                is_first_itr = False
            else:
                for value_, key_ in zip(msg_data['values'], msg_data['keys']):
                    data_dict[key_] += [value_]
    # Consolidate data as numpy arrays
    for key_, data_list_ in data_dict.items():
        data_dict[key_] = np.array(data_list_)

    return data_dict

## Extract data

In [None]:
import nml_bag
from tqdm import tqdm


experimental_data = {}
for dataset_info in tqdm(dataset_info_list):
    reader = nml_bag.Reader(dataset_info['path_to_bag'], topics=topic_list)
    rosbag_data = reader.records

    # Extract actual experimental
    local_data = {}
    local_data['cartesian_state'] = get_cartesian_state(rosbag_data, topic_name=cartesian_state_topic_name)
    local_data['filtered_compliant_frame'] = get_compliance_state(rosbag_data, topic_name=compliant_frame_topic_name)
    local_data['desired_compliant_frame'] = get_compliance_state(rosbag_data, topic_name=desired_compliant_frame_topic_name)
    local_data['diagnostic_data'] = get_diagnostic_data(rosbag_data, topic_name=ppf_node_diagnostic_topic_name)

    # Retrieve simulation time
    local_data['simulation_time'] = get_simulation_time(rosbag_data, topic_name=simulation_time_topic_name)

    # Time mapping definition
    simulation_time_max = np.max(local_data['simulation_time']['time'])
    time_regression_res = scipy.stats.linregress(
        local_data['simulation_time']['ros_time'],
        local_data['simulation_time']['time']
    )
    assert(time_regression_res.rvalue > 0.99)

    def unsafe_map_to_simulation_time(ros_time_in):
        return time_regression_res.intercept + time_regression_res.slope*ros_time_in

    def map_to_simulation_time(ros_time_in):
        naive_value = unsafe_map_to_simulation_time(ros_time_in)
        if ((naive_value < 0.0) or (naive_value > simulation_time_max)):
            return -1  # np.NaN
        else:
            return naive_value

    def append_simulation_time(data_dict):
        data_dict['time'] = np.zeros(data_dict['ros_time'].shape)
        for idx in range(data_dict['time'].shape[0]):
            data_dict['time'][idx] = map_to_simulation_time(data_dict['ros_time'][idx])

    # Cropping function
    def crop_time_serie(data_dict, key, slicing = slice(None)):
        return data_dict[key][np.asarray(data_dict['time'] > -1).nonzero()][slicing]

    # Include simulation time to datasets
    append_simulation_time(local_data['cartesian_state'])
    append_simulation_time(local_data['filtered_compliant_frame'])
    append_simulation_time(local_data['desired_compliant_frame'])
    append_simulation_time(local_data['diagnostic_data'])

    # Include label, info tag, color, etc.
    local_data['tag'] = dataset_info['tag']
    local_data['label'] = dataset_info['label']
    local_data['color'] = dataset_info['color']
    local_data['linestyle'] = dataset_info['linestyle']

    # Consolidate dataset
    experimental_data[local_data['tag']] = local_data

In [None]:
for _, data in experimental_data.items():
    print(data['tag'])
    for key, data_ in data['diagnostic_data'].items():
        print(f'  - {key}')

# Plot stiffness

In [None]:
SAVE_FIGS = True

gs_kw = dict(width_ratios=[1], height_ratios=[0.5, 5])
fig_stiffness, axd = plt.subplot_mosaic([
    ['legend'],
    ['top']
    ],
    gridspec_kw=gs_kw,
    sharex=True,
    figsize=(plt.rcParams["figure.figsize"][0], plt.rcParams["figure.figsize"][1]*0.6)
)
axd['legend'].axis('off')

ax1 = axd['top']

# Stiffness
'''
ax1.plot(
    crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'time'),
    crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'stiffness')[:, 1, 1],
    'k--',
    label = '__NO_LABEL'
)
'''
for tag, dataset in experimental_data.items():
    ax1.plot(
        crop_time_serie(dataset['filtered_compliant_frame'], 'time'),
        crop_time_serie(dataset['filtered_compliant_frame'], 'stiffness')[:, 1, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )

ax1.set_ylabel(r'$K_y$' + '\n' + r'\small{(N.m$^{-1}$)}')

# extra setup
# ------------
ax1.set_xlabel(r'time (s)')
ax1.legend(
    ncol=4,
    columnspacing=0.8,
    bbox_to_anchor=(0.5, 1.26),
    loc='upper center',
)  # , framealpha=0.5)

for ax in [ax1]:
    ax.grid(which='major')
    ax.grid(which='minor', linewidth=0.1)

ax1.set_xlim((0., np.max(crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'time'))))

if SAVE_FIGS :
    multi_format_savefig(
        figure = fig_stiffness,
        dir_name = export_figs_dir,
        fig_name = "stiffness_only"
    )

# Plot position/velocity, wrench

In [None]:
# ----------------------------------
# Position/velocity errors + force
# ----------------------------------
gs_kw = dict(width_ratios=[1], height_ratios=[0.5, 5, 5, 5])
fig_state_meas, axd = plt.subplot_mosaic([
    ['legend'],
    ['top'],
    ['center'],
    ['bottom']],
    gridspec_kw=gs_kw,
    sharex=True
    # layout='constrained'
)
axd['legend'].axis('off')
ax1 = axd['top']
ax2 = axd['center']
ax3 = axd['bottom']

plot_ref = True

# Position
# -------------------
for tag, dataset in experimental_data.items():
    ax1.plot(
        crop_time_serie(dataset['cartesian_state'], 'time'),
        crop_time_serie(dataset['cartesian_state'], 'position')[:, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )
ax1.set_ylabel(r'$p_y$')

# Velocity
# ----------------
for tag, dataset in experimental_data.items():
    ax2.plot(
        crop_time_serie(dataset['cartesian_state'], 'time'),
        crop_time_serie(dataset['cartesian_state'], 'velocity')[:, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )
ax2.set_ylabel(r'$\dot{p}_y$')

# Force
# -----------------------
for tag, dataset in experimental_data.items():
    ax3.plot(
        crop_time_serie(dataset['cartesian_state'], 'time'),
        -crop_time_serie(dataset['cartesian_state'], 'wrench')[:, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )

ax3.set_ylabel(r'$f_{ext, y}$')
ax3.set_xlabel(r'time (s)')


# extra setup
for ax in [ax1, ax2, ax3]:
    ax.grid(which='major')
    ax.grid(which='minor', linewidth=0.1)

ax1.legend(
    ncol=4,
    bbox_to_anchor=(0.5, 1.4),
    loc='upper center',
)  # , framealpha=0.5)

fig_state_meas.align_ylabels([ax1, ax2, ax3])
ax1.set_xlim((0., np.max(crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'time'))))


# -------------------------------
# EXPORT TO FILES
# -------------------------------
if SAVE_FIGS :
    multi_format_savefig(
        figure = fig_state_meas,
        dir_name = export_figs_dir,
        fig_name = "pos_vel_and_force"
    )

In [None]:
# ----------------------------------
# Position/velocity errors + force
# ----------------------------------
gs_kw = dict(width_ratios=[1], height_ratios=[0.5, 5, 5, 5])
fig_stiff_and_state_meas, axd = plt.subplot_mosaic([
    ['legend'],
    ['top'],
    ['center'],
    ['bottom']],
    gridspec_kw=gs_kw,
    sharex=True
    # layout='constrained'
)
axd['legend'].axis('off')
ax1 = axd['top']
ax2 = axd['center']
ax3 = axd['bottom']

plot_ref = True

# Stiffness
# -------------------
for tag, dataset in experimental_data.items():
    ax1.plot(
        crop_time_serie(dataset['filtered_compliant_frame'], 'time'),
        crop_time_serie(dataset['filtered_compliant_frame'], 'stiffness')[:, 1, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )

ax1.set_ylabel(r'$K_y$')  # + '\n' + r'\small{(N.m$^{-1}$)}')

# Position
# -------------------
for tag, dataset in experimental_data.items():
    ax2.plot(
        crop_time_serie(dataset['cartesian_state'], 'time'),
        crop_time_serie(dataset['cartesian_state'], 'position')[:, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )
ax2.set_ylabel(r'$p_y$')

# Force
# -----------------------
for tag, dataset in experimental_data.items():
    ax3.plot(
        crop_time_serie(dataset['cartesian_state'], 'time'),
        -crop_time_serie(dataset['cartesian_state'], 'wrench')[:, 1],
        label = dataset['label'],
        color = dataset['color'],
        linestyle = dataset['linestyle']
    )

ax3.set_ylabel(r'$f_{ext, y}$')
ax3.set_xlabel(r'time (s)')


# extra setup
for ax in [ax1, ax2, ax3]:
    ax.grid(which='major')
    ax.grid(which='minor', linewidth=0.1)

ax1.legend(
    ncol=4,
    bbox_to_anchor=(0.5, 1.4),
    columnspacing=0.8,
    loc='upper center',
)  # , framealpha=0.5)

fig_stiff_and_state_meas.align_ylabels([ax1, ax2, ax3])
ax1.set_xlim((0., np.max(crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'time'))))


# -------------------------------
# EXPORT TO FILES
# -------------------------------
if SAVE_FIGS :
    multi_format_savefig(
        figure = fig_stiff_and_state_meas,
        dir_name = export_figs_dir,
        fig_name = "stiffness_pos_and_force"
    )

# Plot K, z_dot, z

In [None]:
import scipy

# PLOTS

gs_kw = dict(width_ratios=[1], height_ratios=[0.5, 5, 5, 5])
fig_z_z_dot_beta, axd = plt.subplot_mosaic([
    ['legend'],
    ['top'],
    ['center'],
    ['bottom']],
    gridspec_kw=gs_kw,
    sharex=True
    # layout='constrained'
)
axd['legend'].axis('off')
ax1 = axd['top']
ax2 = axd['center']
ax3 = axd['bottom']

# -------------------------
# z_dot
# -------------------------
for tag, dataset in experimental_data.items():
    if dataset['diagnostic_data'].get('z_dot') is not None:
        ax1.plot(
            crop_time_serie(dataset['diagnostic_data'], 'time'),
            crop_time_serie(dataset['diagnostic_data'], 'z_dot'),
            label = dataset['label'],
            color = dataset['color'],
            # linestyle = dataset['linestyle']
        )

ax1.set_ylabel(r'$w$' + ' ' + r'\small{(J.s${}^{-1}$)}')

# -------------------------
# Integral of z_dot
# -------------------------
for tag, dataset in experimental_data.items():
    if dataset['diagnostic_data'].get('z') is not None:
        ax2.plot(
            crop_time_serie(dataset['diagnostic_data'], 'time'),
            crop_time_serie(dataset['diagnostic_data'], 'z'),
            label = '__no-label-for_' + dataset['label'],
            color = dataset['color'],
            linestyle = dataset['linestyle']
        )
ax2.plot(
    crop_time_serie(experimental_data['SDPF_PPF_adaptive']['diagnostic_data'], 'time'),
    crop_time_serie(experimental_data['SDPF_PPF_adaptive']['diagnostic_data'], 'z_min'),
    label = r'z_{min}',
    color = 'k',
    linestyle = ':'
)

ax2.legend(loc='lower right')

ax2.set_ylabel(
    r'$z$'  # = \int_0^t w(\cdot) d\tau$'
    # r'{\setlength{\fboxrule}{0pt} \fbox{ \phantom{${\displaystyle \int_0^t}$} ${\int_0^t w\left(\beta(\tau), \tau\right) d\tau}$}}'
    # + '\n'
    + ' '
    + r'\small{(J)}'
)


# ax2.legend(loc='lower right')

# ax2.set_ylabel(r'${\displaystyle \int_0^t w\left(\beta(\tau), \tau\right) d\tau}$')  # r'\small{(J)}')
# ax2.set_ylabel(r'$\int_0^t w$' + '\n' + r'\small{(J)}')
# ax.set_xlabel(r'Time(s)')

# -------------------------
# Beta
# -------------------------
for tag, dataset in experimental_data.items():
    if dataset['diagnostic_data'].get('beta') is not None:
        ax3.plot(
            crop_time_serie(dataset['diagnostic_data'], 'time'),
            crop_time_serie(dataset['diagnostic_data'], 'beta'),
            label = dataset['label'],
            color = dataset['color'],
            # linestyle = dataset['linestyle']
        )

ax3.set_ylabel(r'$\beta$' + ' ' + r'\small{(unitless)}')
ax3.set_xlabel(r'time (s)')

# extra setup
for ax in [ax1, ax2, ax3]:
    ax.grid(which='major')
    ax.grid(which='minor', linewidth=0.1)

ax1.legend(
    ncol=4,
    bbox_to_anchor=(0.5, 1.4),
    loc='upper center',
)  # , framealpha=0.5)

fig_z_z_dot_beta.align_ylabels([ax1, ax2, ax3])
ax1.set_xlim((0., np.max(crop_time_serie(experimental_data['SDPF_QP']['desired_compliant_frame'], 'time'))))

# -------------------------------
# EXPORT TO FILES
# -------------------------------
if SAVE_FIGS :
    multi_format_savefig(
        figure = fig_z_z_dot_beta,
        dir_name = export_figs_dir,
        fig_name = "z_dot_z_and_beta"
    )

In [None]:
experimental_data['SIPF+']['diagnostic_data']

In [None]:
experimental_data['SDPF_PPF']['diagnostic_data']