In [None]:
import os
import pickle
from pathlib import Path

# import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import yaml
from matplotlib.lines import Line2D
from torch.backends.cudnn import allow_tf32

# plot_folder = Path("/Users/jg/Desktop/upper_limb/paper_figures-3")
# os.makedirs(plot_folder, exist_ok=True)
# data_folder = Path("/Users/jg/Desktop/upper_limb/paper_data-2")


In [None]:
participants = [
    "P_149",
    "P_238",
    "P_407",
    "P_426",
    "P_577",
    "P_668",
    "P_711",
    "P_950",
    "P7_453",
    "P6_820",
]
online_participants = [
    "P_149",
    "P_238",
    "P_407",
    "P_426",
    "P_577",
    "P_668",
    "P_711",
    "P_950",
    "P7_453",
]
targets = [
    "indexAng",
    "midAng",
    "ringAng",
    "pinkyAng",
    "thumbInPlaneAng",
    "thumbOutPlaneAng",
    "wristFlex",
]
recordings = [
    "thumbFlEx",
    "thumbAbAd",
    "indexFlEx",
    "mrpFlEx",
    "fingersFlEx",
    "wristFlEx",
    "handOpCl",
    "pinchOpCl",
    "pointOpCl",
]
test_recordings = ["keyOpCl", "wristFlHandCl", "indexFlDigitsEx"]

df = pd.DataFrame(
    index=participants,
    columns=pd.MultiIndex.from_product(
        [
            [
                "init",
                "offline",
                "comp",
                "comp_interp",
                "pert_init",
                "pert_offline",
                "pert_comp",
                "pert_comp_interp",
            ],
            ["val", "test", "total"],
        ]
    ),
)
df.loc[:, "participants"] = df.index


In [None]:
participant = "P_950"
recording = "fingersFlEx"
data_dir = Path(
    f"/Users/jorisg/projects/biomech_PCP/data/{participant}/recordings/{recording}/experiments/1"
)

perturber_path = Path(
    f"/Users/jorisg/projects/biomech_PCP/data/{participant}/online_trials/perturb/perturber.npy"
)

config_path = Path(
    f"/Users/jorisg/projects/biomech_PCP/data/{participant}/configs/modular.yaml"
)
perturber = np.load(perturber_path)
with open(config_path, "r") as f:
    config = yaml.safe_load(f)

features = config["parameters"]["features"]["value"]
print(features)

try:
    emg = np.load(data_dir / "cropped_aligned_emg.npy")
except:  # noqa: E722
    emg = np.load(data_dir / "cropped_emg.npy")

int_features = [int(feature[1]) for feature in features]
emg = emg[:, int_features]

perturbed_emg = (perturber @ emg.T).T

In [None]:
perturbed_emg.shape

In [None]:
emg.shape

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import yaml

participants = [
    "P_149",
    "P_238",
    "P_407",
    "P_426",
    "P_577",
    "P_668",
    "P_711",
    "P_950",
    "P7_453",
    "P6_820",
]

all_participant_corrs = []

for participant in participants:
    all_corrs = []
    for recording in recordings + test_recordings:
        data_dir = Path(f"data/{participant}/recordings/{recording}/experiments/1")
        perturber_path = Path(f"data/{participant}/online_trials/perturb/perturber.npy")
        config_path = Path(f"data/{participant}/configs/modular.yaml")
        if not (data_dir.exists() and perturber_path.exists() and config_path.exists()):
            continue
        perturber = np.load(perturber_path)
        with open(config_path, "r") as f:
            config = yaml.safe_load(f)
        features = config["parameters"]["features"]["value"]
        try:
            emg = np.load(data_dir / "cropped_aligned_emg.npy")
        except:
            try:
                emg = np.load(data_dir / "cropped_emg.npy")
            except:
                continue
        int_features = [int(feature[1]) for feature in features]
        emg = emg[:, int_features]
        perturbed_emg = (perturber @ emg.T).T

        emg_df = pd.DataFrame(emg, columns=[f"emg_{i}" for i in range(emg.shape[1])])
        emg_corr = pd.DataFrame(
            np.corrcoef(emg.T), columns=emg_df.columns, index=emg_df.columns
        )
        self_corr = [
            np.corrcoef(emg[:, i], perturbed_emg[:, i])[0, 1]
            for i in range(emg.shape[1])
        ]
        for i in range(emg.shape[1]):
            emg_corr.iloc[i, i] = self_corr[i]
        emg_corr.columns = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]
        emg_corr.index = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]
        all_corrs.append(emg_corr.values)
    if all_corrs:
        mean_corr = np.mean(all_corrs, axis=0)
        all_participant_corrs.append(mean_corr)

# Compute grand mean and std across participants
all_participant_corrs = np.array(all_participant_corrs)
grand_mean = np.mean(all_participant_corrs, axis=0)
grand_std = np.std(all_participant_corrs, axis=0)

# Use the last emg_corr for channel labels
labels = (
    emg_corr.columns
    if "emg_corr" in locals()
    else [f"emg_{i}" for i in range(grand_mean.shape[0])]
)

# Plot mean
plt.figure(figsize=(8, 6))
sns.heatmap(
    grand_mean,
    annot=True,
    fmt=".2f",
    cmap="coolwarm",
    vmin=-1,
    vmax=1,
    xticklabels=labels,
    yticklabels=labels,
    cbar_kws={"label": "Mean Correlation"},
)
plt.title("Mean EMG-to-EMG Correlation (Diagonal: EMG vs Perturbed Self)")
plt.xlabel("EMG Channels")
plt.ylabel("EMG Channels")
plt.tight_layout()
plt.show()

# Plot std
plt.figure(figsize=(8, 6))
sns.heatmap(
    grand_std,
    annot=True,
    fmt=".2f",
    cmap="YlOrBr",
    xticklabels=labels,
    yticklabels=labels,
    cbar_kws={"label": "Std. Dev. of Correlation"},
)
plt.title("Std. Dev. of EMG-to-EMG Correlation Across Participants")
plt.xlabel("EMG Channels")
plt.ylabel("EMG Channels")
plt.tight_layout()
plt.show()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

participant = "P_950"
for recording in recordings + test_recordings:
    data_dir = Path(
        f"/Users/jorisg/projects/biomech_PCP/data/{participant}/recordings/{recording}/experiments/1"
    )

    perturber_path = Path(
        f"/Users/jorisg/projects/biomech_PCP/data/{participant}/online_trials/perturb/perturber.npy"
    )

    config_path = Path(
        f"/Users/jorisg/projects/biomech_PCP/data/{participant}/configs/modular.yaml"
    )
    perturber = np.load(perturber_path)
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)

    features = config["parameters"]["features"]["value"]
    print(features)

    try:
        emg = np.load(data_dir / "cropped_aligned_emg.npy")
    except:  # noqa: E722
        emg = np.load(data_dir / "cropped_emg.npy")

    int_features = [int(feature[1]) for feature in features]
    emg = emg[:, int_features]

    perturbed_emg = (perturber @ emg.T).T
    # Create DataFrames for easier correlation calculation
    emg_df = pd.DataFrame(emg, columns=[f"emg_{i}" for i in range(emg.shape[1])])

    # Compute emg-to-emg correlation matrix
    emg_corr = pd.DataFrame(
        np.corrcoef(emg.T), columns=emg_df.columns, index=emg_df.columns
    )

    # Compute correlation between each emg channel and its perturbed counterpart
    self_corr = [
        np.corrcoef(emg[:, i], perturbed_emg[:, i])[0, 1] for i in range(emg.shape[1])
    ]

    # Replace diagonal with emg-to-perturbed self correlation
    for i in range(emg.shape[1]):
        emg_corr.iloc[i, i] = self_corr[i]

    emg_corr.columns = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]
    emg_corr.index = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]


plt.figure(figsize=(8, 6))
sns.heatmap(
    emg_corr,
    annot=True,
    fmt=".2f",
    cmap="coolwarm",
    vmin=-1,
    vmax=1,
    cbar_kws={"label": "Correlation"},
)
plt.title("EMG-to-EMG Correlation (Diagonal: EMG vs Perturbed Self)")
plt.xlabel("EMG Channels")
plt.ylabel("EMG Channels")
plt.tight_layout()
plt.show()

In [None]:
import seaborn as sns

plt.figure(figsize=(8, 6))
sns.heatmap(
    corr_matrix,
    annot=True,
    fmt=".2f",
    cmap="coolwarm",
    vmin=-1,
    vmax=1,
    cbar_kws={"label": "Correlation"},
)
plt.title("Correlation Matrix: EMG vs Perturbed EMG Channels")
plt.xlabel("Perturbed EMG Channels")
plt.ylabel("Original EMG Channels")
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import pandas as pd

# Compute emg-to-emg correlation matrix
emg_corr = pd.DataFrame(
    np.corrcoef(emg.T), columns=emg_df.columns, index=emg_df.columns
)

# Compute correlation between each emg channel and its perturbed counterpart
self_corr = [
    np.corrcoef(emg[:, i], perturbed_emg[:, i])[0, 1] for i in range(emg.shape[1])
]

# Replace diagonal with emg-to-perturbed self correlation
for i in range(emg.shape[1]):
    emg_corr.iloc[i, i] = self_corr[i]

emg_corr.columns = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]
emg_corr.index = [f"emg_{int_features[i]}" for i in range(emg.shape[1])]
# Visualize
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(8, 6))
sns.heatmap(
    emg_corr,
    annot=True,
    fmt=".2f",
    cmap="coolwarm",
    vmin=-1,
    vmax=1,
    cbar_kws={"label": "Correlation"},
)
plt.title("EMG-to-EMG Correlation (Diagonal: EMG vs Perturbed Self)")
plt.xlabel("EMG Channels")
plt.ylabel("EMG Channels")
plt.tight_layout()
plt.show()


In [None]:
import glob
from pathlib import Path

import yaml
from deepdiff import DeepDiff

base_path = Path("data/P_149/configs/modular.yaml")
with open(base_path, "r") as f:
    base_config = yaml.safe_load(f)

# Find all modular.yaml files in data/*/configs/
all_configs = [
    Path(f"data/{p}/configs/modular.yaml")
    for p in participants
    if Path(f"data/{p}/configs/modular.yaml") != base_path
    and Path(f"data/{p}/configs/modular.yaml").exists()
]
all_configs = [Path(p) for p in all_configs if Path(p) != base_path]

for config_path in all_configs:
    with open(config_path, "r") as f:
        other_config = yaml.safe_load(f)
    diff = DeepDiff(base_config, other_config, ignore_order=True)
    print(f"\n--- Differences with {config_path} ---")
    if diff:
        print(diff)
    else:
        print("No differences.")

In [None]:
for participant in participants:
    if participant not in os.listdir("/Users/jorisg/projects/biomech_PCP/data"):
        print(f"Participant {participant} not found in data directory.")

for participant in os.listdir("/Users/jorisg/projects/biomech_PCP/data"):
    if participant.startswith("P_") and participant not in participants:
        print(
            f"Participant {participant} is not in the predefined list of participants."
        )

# Check if all participants have the same number of features

In [None]:
# copy /Users/jorisg/projects/biomech_PCP/data/P_149/configs/modular_initialization.yaml to all participants, change
# the first line from "name: P_149" to "name: {participant}"
from pathlib import Path
import yaml


def copy_initialization_config(participants, base_path):
    base_config_path = Path(base_path)
    with open(base_config_path, "r") as f:
        base_config = yaml.safe_load(f)

    for participant in participants:
        participant_config_path = Path(
            f"data/{participant}/configs/modular_initialization.yaml"
        )
        if not participant_config_path.exists():
            # Create the directory if it doesn't exist
            # participant_config_path.parent.mkdir(parents=True, exist_ok=True)
            # Modify the base config for the current participant
            base_config["name"] = participant
            with open(participant_config_path, "w") as f:
                yaml.dump(base_config, f)
            print(f"Copied config to {participant_config_path}")


copy_initialization_config(
    participants, "data/P_149/configs/modular_initialization.yaml"
)


In [None]:
np.sqrt(3.0) * np.sqrt(2.0 / (1 + 0.01**2))


In [None]:
python s4_train.py --person_dir P_149 --intact_hand Left --config_name modular_initialization -hs