In [None]:
import re
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from glob import glob

In [None]:
# Low pass filter to filter out unwanted noise from the force plate data.
def low_pass_filter(x, fc = 400, fs = 3500):
    w = fc / (fs / 2)  # Normalize the frequency
    b, a = signal.butter(3, w, "low")

    return signal.filtfilt(b, a, x, axis=0)


def remove_dc_offset(x, num_samples: int = 100):
    offset = np.mean(x[:num_samples], axis=0)
    return x - offset


def rmse(predictions, targets):
    differences = predictions - targets
    differences_squared = differences**2
    mean_of_differences_squared = np.nanmean(differences_squared.flatten())
    rmse_val = np.sqrt(mean_of_differences_squared)
    return rmse_val

In [None]:
# Read kinetic data from text file and manipulate it into each force plate etc.
#
# az0 = - 0.062;
# a = 0.210
# b = 0.350
#
# The Kistler force plate dimensions are 600 x 900 mm.
#
# Mx = b * (fz1 + fz2 - fz3 - fz4)
# My = a * (-fz1 + fz2 + fz3 - fz4)
# Mz = b * (-fx12 + fx34) + a * (fy14 - fy23)
# Mx' = Mx + Fy*az0
# My' = My - Fx*az0
# cop_x = -My' / Fz
# cop_y = Mx' / Fz
# Tz = Mz - Fy * ax + Fx * ay
# Declare constants used to convert the raw stream of data to metrics.
root_dir = "/Users/zico/OneDrive - University of Cape Town/msc/data/grf_kinetic_data/ForcePlateData"
txt_data = sorted(glob(os.path.join(root_dir, "**/*.txt"), recursive=True))
num_force_plates = 8
num_data_columns = 8
az0 = -0.062
a = 0.210
b = 0.350
for txt_fname in txt_data:
    out_fname = txt_fname.split("/")[-1][:-4]
    with open(txt_fname, "r") as f:
        data_str = f.read()
        data = re.split("[\t\n]", data_str)
        data = np.array(data[:-1], dtype=np.float32)
        # TODO: There are meant to be 8 force plates but we only have data for 7 of them?
        num_force_plates, fs = (7, 1000) if "0609" in out_fname else (8, 3500)
        try:
            force_plate_data = data.reshape((-1, num_force_plates, num_data_columns))
        except:
            f.close()
            print(f"Not correct number of force plates (skip): {out_fname}")
            continue
        # Run through each force plate and determine values
        df_list = []
        for i in range(num_force_plates):
            force_plate_data_filtered = low_pass_filter(force_plate_data[:, i], fc=400, fs=fs)
            fx12 = force_plate_data_filtered[:, 0]
            fx34 = force_plate_data_filtered[:, 1]
            fy14 = force_plate_data_filtered[:, 2]
            fy23 = force_plate_data_filtered[:, 3]
            fz1 = force_plate_data_filtered[:, 4]
            fz2 = force_plate_data_filtered[:, 5]
            fz3 = force_plate_data_filtered[:, 6]
            fz4 = force_plate_data_filtered[:, 7]
            Fx = fx12 + fx34
            Fy = fy14 + fy23
            Fz = fz1 + fz2 + fz3 + fz4
            Mx = b * (fz1 + fz2 - fz3 - fz4)
            My = a * (-fz1 + fz2 + fz3 - fz4)
            Mz = b * (-fx12 + fx34) + a * (fy14 - fy23)
            Mx_ = Mx + Fy*az0
            My_ = My - Fx*az0
            cop_x = -My_ / Fz
            cop_y = Mx_ / Fz
            Tz = Mz - Fy * cop_x + Fx * cop_y
            df_list.append(pd.DataFrame(np.array([Fx, Fy, Fz, cop_x, cop_y, Tz]).T,
                                columns=["Fx", "Fy", "Fz", "cop_x", "cop_y", "Tz"]))
        # Convert data to a single coordinate frame.
        ref_plate = df_list[0].copy()
        Fx = ref_plate["Fx"].values
        Fy = ref_plate["Fy"].values
        Fz = ref_plate["Fz"].values
        dax = 0.6 * np.array([float(i) for i in range(num_force_plates)])
        day = [0.0] * num_force_plates
        Mx = (day[0] + ref_plate["cop_y"].values) * Fz
        My = -(dax[0] + ref_plate["cop_x"].values) * Fz
        Mz = ((dax[0] + ref_plate["cop_x"].values) * Fy - (day[0] + ref_plate["cop_y"].values) * Fx)
        for i in range(1, num_force_plates):
            fp_data = df_list[i]
            Fx += fp_data["Fx"].values
            Fy += fp_data["Fy"].values
            Fz += fp_data["Fz"].values
            Mx += (day[i] + fp_data["cop_y"].values) * fp_data["Fz"].values
            My -= (dax[i] + fp_data["cop_x"].values) * fp_data["Fz"].values
            Mz += ((dax[i] + fp_data["cop_x"].values) * fp_data["Fy"].values -
                (day[i] + fp_data["cop_y"].values) * fp_data["Fx"].values)
        Mx_ = Mx
        My_ = My
        for i in range(num_force_plates):
            fp_data = df_list[i]
            Mx_ += az0 * fp_data["Fy"].values
            My_ -= az0 * fp_data["Fx"].values
        cop_x = -My_ / Fz
        cop_y = Mx_ / Fz
        Tz = Mz - Fy*cop_x + Fx*cop_y
        df_list.append(pd.DataFrame(np.array([Fx, Fy, Fz, cop_x, cop_y, Tz]).T,
                            columns=["Fx", "Fy", "Fz", "cop_x", "cop_y", "Tz"]))
        # Concatenate results and write to file.
        df = pd.concat(df_list, keys=[i for i in range(num_force_plates + 1)], axis=0)
        df.index.set_names(["force_plate", "frame"], inplace=True)
        try:
            df.to_csv(
                os.path.join(
                    root_dir,
                    f"{out_fname}.csv"))
            df.to_hdf(os.path.join(
                root_dir,
                f"{out_fname}.h5"),
                    "force_plate_data_df",
                    format="table",
                    mode="w")
        except:
            print(f"Failed to save results: {out_fname}")
            continue


In [None]:
# Plotting results for global coordinate frame.
# The Kistler force plate dimensions are 600 x 900 mm.
root_dir = "/Users/zico/OneDrive - University of Cape Town/msc/data/grf_kinetic_data/ForcePlateData"
df_data = sorted(glob(os.path.join(root_dir, "**/*.h5"), recursive=True))
for df_file in df_data:
    df = pd.read_hdf(df_file)
    out_fname = df_file.split("/")[-1][:-3]
    num_force_plates = 7 if "0609" in out_fname else 8
    x_axis_m = num_force_plates * 0.6
    y_axis_m = 0.9
    # Plot each force plate separately.
    for i in range(num_force_plates):
        fp_data = df.query(f"force_plate == {i}")
        fig = plt.figure(figsize=(16, 12), dpi=120)
        ax = fig.add_subplot(111)
        ax.plot(fp_data["Fx"].values, color="b", label="Fx")
        ax.plot(fp_data["Fy"].values, color="g", label="Fy")
        ax.plot(fp_data["Fz"].values, color="r", label="Fz")
        ax.set_title("GRF")
        ax.set_xlabel("Frames")
        ax.set_ylabel("Force (N)")
        ax.legend()
        plt.savefig(os.path.join(root_dir, f"{out_fname}_fp{i+1}.png"))
        plt.close()
    # Plot data referenced to a single world location.
    values = df.query(f"force_plate == {num_force_plates}")
    fig = plt.figure(figsize=(16, 12), dpi=120)
    # axs = fig.subplots(1, 2).flatten()
    axs = [fig.subplots(1, 1)]
    axs[0].plot(values["Fx"].values, color="b", label="Fx")
    axs[0].plot(values["Fy"].values, color="g", label="Fy")
    axs[0].plot(values["Fz"].values, color="r", label="Fz")
    axs[0].set_title("GRF")
    axs[0].set_xlabel("Frames")
    axs[0].set_ylabel("Force (N)")
    axs[0].legend()
    # axs[1].scatter(values["cop_x"].values, values["cop_y"].values)
    # axs[1].set_xlim((-0.6, x_axis_m))
    # axs[1].set_ylim((-y_axis_m/2, y_axis_m/2))
    # axs[1].set_aspect('equal', 'box')
    # axs[1].set_title("COP")
    # axs[1].set_xlabel("X")
    # axs[1].set_ylabel("Y")
    plt.savefig(os.path.join(root_dir, f"{out_fname}.png"))
    plt.close()
    fig = plt.figure(figsize=(16, 12), dpi=120)
    axs = fig.subplots(2, 1).flatten()
    axs[0].plot(values["cop_x"].values)
    axs[1].plot(values["cop_y"].values)
    axs[0].set_title("COP X")
    axs[1].set_title("COP Y")
    axs[0].set_xlabel("Frames")
    axs[1].set_xlabel("Frames")
    plt.savefig(os.path.join(root_dir, f"{out_fname}_cop.png"))
    plt.close()

In [None]:
# Post processing - synthesis of GRF from peak data.
from scipy import interpolate
# Data location.
data_fname = "/Users/zico/Library/CloudStorage/OneDrive-UniversityofCapeTown/msc/data/grf_kinetic_data/ForcePlateData/Shiraz_110909_01.h5"
df = pd.read_hdf(data_fname)
# Note the direction of the cheetah to determine the tangential GRF. This needs to come from the COM velocity direction.
direction = -1
# Note, the body mass of the cheetah.
body_weight = 35.0 * 9.81 if "Shiraz" in data_fname else 30.0 * 9.81
# Note, which force plate corresponds to which limb, and whether it is leading or non-leading.
contacts = {"LFL": 3, "NLFL": 4, "LHL": 0, "NLHL": 1}
# Find peaks: max Fz (approx. half cycle sinuisoid), max and min Fx (approx. by a 3rd spline between peaks).
# Note, we ignore the Fy direction between it is a very small contribution when running in the +x or -x direction.
df_results = {}
for limb, fp  in contacts.items():
    data = df.query(f"force_plate == {fp-1}")
    Fz = (remove_dc_offset(data["Fz"].values, num_samples=200) / body_weight)
    Fx = direction * (remove_dc_offset(data["Fx"].values, num_samples=200) / body_weight)
    Fz_max, Fz_max_idx = np.max(Fz), np.argmax(Fz)
    Fx_max, Fx_min = np.max(Fx), np.min(Fx)
    # Determine the start and end locations of the stance to get an approx. stance time for the generation of the GRF data.
    # We can assume that the contact time starts more or less after 10% detection threshold of the peak Fz. This might need to be tuned for different cases.
    threshold = 0.1 * Fz_max
    thresh_padding = 5
    indices = np.argwhere(Fz > threshold)
    start_idx = indices[0][0] - thresh_padding
    remaining_indices = np.argwhere(Fz[indices[0][0]:] < threshold)
    end_idx = indices[0][0] + remaining_indices[0][0] + thresh_padding
    stance_time = end_idx - start_idx
    stance_start = 0
    stance_end = stance_time
    peak_idx = Fz_max_idx - start_idx
    print(f"Data for limb: {limb}")
    print(f"Fz max: ({Fz_max}, {Fz_max_idx})")
    print(f"Fx max/min: ({Fx_max}, {Fx_min})")
    print(f"Stance time: {stance_time}")
    orig_idx = np.arange(start_idx, end_idx)
    t = np.linspace(stance_start, stance_end, stance_time)
    synth_Fz = Fz_max * np.sin(np.pi * (t / stance_time))
    Fx_control_pts = np.array([[stance_start, 0.0], [peak_idx // 2, Fx_min],
                               [peak_idx, 0.0], [peak_idx + (stance_end - peak_idx) // 2, Fx_max], [stance_end, 0.0]])
    x = interpolate.InterpolatedUnivariateSpline(Fx_control_pts[:, 0], Fx_control_pts[:, 1], k=2)
    synth_Fx = x(t)
    # Plot the original waveform with the synthesised.
    padding = 50
    fig = plt.figure(figsize=(16, 9), dpi=120);
    plt.plot(t + padding, synth_Fz, "r", label="Synth Fz");
    plt.plot(t + padding, synth_Fx, "b", label="Synth Fx");
    orig_t = np.arange(start_idx - padding, end_idx + padding)
    plt.plot(Fz[orig_t], "r", label="Fz", alpha=0.35);
    plt.plot(Fx[orig_t], "b", label="Fx", alpha=0.35);
    plt.xlabel("Sample Time");
    plt.ylabel("Force (N)");
    plt.legend();
    # Calculate the RMSE.
    Fz_error = rmse(synth_Fz, Fz[orig_idx])
    Fx_error = rmse(synth_Fx, Fx[orig_idx])
    plt.title(f"{limb} - Stance time: {stance_time * (2/7):.2f}ms, RMSE (Fz, Fx): ({Fz_error:.2f}, {Fx_error:.2f}).")
    plt.show(block=False);
    plt.close();
    Fz_synth = np.zeros_like(Fz)
    Fx_synth = np.zeros_like(Fx)
    Fy_synth = np.zeros_like(Fx)
    Fz_synth[orig_idx] = synth_Fz
    Fx_synth[orig_idx] = synth_Fx
    df_results[fp] = pd.DataFrame(np.array([Fx_synth, Fy_synth, Fz_synth]).T, columns=["Fx", "Fy", "Fz"])

df_synth = pd.concat(df_results.values(), keys=df_results.keys(), axis=0)
df_synth.index.set_names(["force_plate", "frame"], inplace=True)
out_fname = "/Users/zico/Library/CloudStorage/OneDrive-UniversityofCapeTown/msc/data/grf_kinetic_data/ForcePlateData/Shiraz_110909_01_synth"
df_synth.to_csv(f"{out_fname}.csv")
df_synth.to_hdf(f"{out_fname}.h5", "force_plate_data_df", format="table", mode="w")