In [1]:
import opensim as osim
import os
from simFunctions import *
from stan_utils import *
import pandas as pd


# Export C3D markers, ground reaction forces and EMG

## Folder Iterator Class

In [2]:
""""

This class is built on the basis of Dimitar Stansv's https://github.com/mitkof6/opensim_automated_pipeline .
It only works together with Diitar's utility functions, included as 'stan_utils' in this repository.
The class requires that a list of folders that contain C3D files is provided, along with a working directory and session directory.
If there is only one file, please arrange it in a folder and supply as a list with one element, as given in the example below.
The class handles static and dynamic trials equally.

"""

class C3DFolderIterator:
    def __init__(self, list_of_folders, working_dir, session_dir, stance_threshold=50, plates=3, debug=False, import_emg=False, plot=False):
        self.folders = list_of_folders
        self.input_dir = os.path.join(working_dir, session_dir)
        self.output_dir = os.path.join(working_dir, session_dir)
        self.stance_threshold = stance_threshold
        self.plates = plates
        self.debug = debug
        self.import_emg = import_emg
        self.plot = plot

    def _prepare_labels(self):
        """Prepare force labels based on the number of plates."""
        labels_wrench = [
            'ground_force_vx', 'ground_force_vy', 'ground_force_vz',
            'ground_force_px', 'ground_force_py', 'ground_force_pz',
            'ground_torque_x', 'ground_torque_y', 'ground_torque_z'
        ]
        labels = [
            f'plate{i}_{label}' 
            for i in range(1, self.plates + 1) 
            for label in labels_wrench
        ]
        return labels

    def _process_markers(self, markers_task, output_path):
        """Process and save marker data."""
        rotate_data_table(markers_task, [1, 0, 0], -90)
        rotate_data_table(markers_task, [0, 1, 0], -90)
        osim.TRCFileAdapter().write(markers_task, output_path)

    def _process_emg(self, analog_task, output_dir):
        """Process and export EMG data."""
        time = analog_task.getIndependentColumn()
        columns = analog_task.getColumnLabels()
        emg_columns = [col[len('EMG Channels.'):] for col in columns if 'EMG' in col]
        emg_col_nums = [i for i, col in enumerate(columns) if 'EMG' in col]
        emg_data = analog_task.getMatrixBlock(0, emg_col_nums[0], len(time), len(emg_col_nums))
        emg_sto = create_opensim_storage(time, emg_data, emg_columns)
        emg_sto.setName('EMG')
        emg_sto.printResult(emg_sto, 'task_emg', output_dir, 0.01, '.mot')

    def _process_forces(self, forces_task):
        """Process and refine force data."""
        rotate_data_table(forces_task, [1, 0, 0], -90)
        rotate_data_table(forces_task, [0, 1, 0], -90)
        for i in range(1, self.plates + 1):
            mm_to_m(forces_task, f'p{i}')
            mm_to_m(forces_task, f'm{i}')
        return forces_task

    def _refine_ground_reaction_forces(self, forces_task):
        """Refine ground reaction forces and collect step data."""
        t0_list, tf_list, p_l_list = [], [], []
        for i in range(1, self.plates + 1):
            t0, tf, p_l = refine_ground_reaction_wrench_ks(
                forces_task, [f'f{i}', f'p{i}', f'm{i}'],
                stance_threshold=self.stance_threshold, tau=0.001, debug=self.debug
            )
            t0_list.append(t0)
            tf_list.append(tf)
            p_l_list.append(p_l)
        return t0_list, tf_list, p_l_list

    def _export_data(self, forces_task, output_dir, labels_force):
        """Export force data."""
        time = forces_task.getIndependentColumn()
        forces_task = forces_task.flatten(['x', 'y', 'z'])
        force_sto = create_opensim_storage(time, forces_task.getMatrix(), labels_force)
        force_sto.setName('GRF')
        force_sto.printResult(force_sto, 'task_grf', output_dir, 0.01, '.mot')

    def _plot_results(self, output_dir):
        """Plot task results."""
        plot_sto_file(os.path.join(output_dir, 'task.trc'), os.path.join(output_dir, 'task.pdf'), 3)
        plot_sto_file(os.path.join(output_dir, 'task_grf.mot'), os.path.join(output_dir, 'task_grf.pdf'), 3)
        if self.import_emg:
            plot_sto_file(os.path.join(output_dir, 'task_emg.mot'), os.path.join(output_dir, 'task_emg.pdf'), 3)

    def apply(self):
        """Process each folder."""
        labels_force = self._prepare_labels()
        adapter = osim.C3DFileAdapter()
        adapter.setLocationForForceExpression(osim.C3DFileAdapter.ForceLocation_CenterOfPressure)

        for folder in self.folders:
            task_file = f'{folder}.c3d'
            input_path = os.path.join(self.input_dir, folder, task_file)
            output_dir = os.path.join(self.output_dir, folder)

            # Read data
            task_data = adapter.read(input_path)
            markers_task = adapter.getMarkersTable(task_data)
            forces_task = adapter.getForcesTable(task_data)
            analog_task = adapter.getAnalogDataTable(task_data)

            # Process markers
            self._process_markers(markers_task, os.path.join(output_dir, 'task.trc'))

            # Process EMG
            if self.import_emg:
                self._process_emg(analog_task, output_dir)

            # Process forces
            forces_task = self._process_forces(forces_task)

            # Refine forces and save steps
            t0_list, tf_list, p_l_list = self._refine_ground_reaction_forces(forces_task)
            pd.DataFrame({'t0': t0_list, 'tf': tf_list, 'p_l': p_l_list}).to_csv(os.path.join(output_dir, 'df_steps.csv'))

            # Export forces
            self._export_data(forces_task, output_dir, labels_force)

            # Plot results
            if self.plot:
                self._plot_results(output_dir)


## Parent directory
Check the parent directory: working directory and session directory (if present) will have to be relative to it.

In [3]:
os.getcwd()

'c:\\Users\\User\\OneDrive\\Documents\\WORK\\JRF_GaitAnalysis\\modelling_paper\\repo\\folder\\model_update'

### Static

In [4]:
working_dir = '../motion_lab'
session_dir = 'static' 
trial_names = ['Static03'] # static may have only one file or may have more

In [6]:
list_of_folders = trial_names
walking_iterator = C3DFolderIterator(list_of_folders, working_dir, session_dir, stance_threshold=30, plates=3, debug=False, import_emg = False, plot=False)
walking_iterator.apply()