In [3]:
import numpy as np
import pandas as pd
import os
from scipy.interpolate import interp1d
import CHEATCP_fxns_V3 as cf
from scipy.interpolate import splrep, splev
import matplotlib.pyplot as plt

In [4]:
BASE_DIR = r'C:\Users\LibraryUser\Downloads\Fall2024\BrainAndAction\CP\CP'
DATA_DIR = os.path.join(BASE_DIR, 'data')
RESULTS_DIR = os.path.join(BASE_DIR, 'results_final_oct10')
print(f"Results Directory: {RESULTS_DIR}")

MATFILES_DIR = os.path.join(DATA_DIR, 'matfiles')
MASTER_FILE = os.path.join(DATA_DIR, 'KINARMdataset_SubjectSummary_All Visits_OK_12-20-23.xlsx')
DEFAULTS = cf.define_defaults()
excel_output_path = os.path.join(RESULTS_DIR, 'Pvar_Values_BlockWise_oct14.xlsx')


Results Directory: C:\Users\LibraryUser\Downloads\Fall2024\BrainAndAction\CP\CP\results_final_oct10


In [7]:
try:
    mdf = pd.read_excel(open(MASTER_FILE, 'rb'), sheet_name='KINARM_AllVisitsMaster')
    print("Master Excel file loaded successfully.")
except FileNotFoundError:
    print(f"Master file not found at {MASTER_FILE}. Please check the path.")
    exit(1)
except Exception as e:
    print(f"An error occurred while loading the master file: {e}")
    exit(1)

try:
    all_df, allTrajs = cf.getDataCP(mdf, MATFILES_DIR, DEFAULTS)
    print("Successfully loaded trials data.")
except Exception as e:
    print(f"An error occurred while loading data with getDataCP: {e}")
    exit(1)


Master Excel file loaded successfully.
evaluating the subject : cpvib040
evaluated the kindData for sub : cpvib040
evaluating the subject : cpvib040
evaluated the kindData for sub : cpvib040
evaluating the subject : cpvib040
evaluated the kindData for sub : cpvib040
evaluating the subject : cpvib040
no peaks found
no peaks found
evaluated the kindData for sub : cpvib040
evaluating the subject : cpvib040
bad first peak
evaluated the kindData for sub : cpvib040
evaluating the subject : cpvib042
evaluated the kindData for sub : cpvib042
evaluating the subject : cpvib042
evaluated the kindData for sub : cpvib042
evaluating the subject : cpvib042
skipping C:\Users\LibraryUser\Downloads\Fall2024\BrainAndAction\CP\CP\data\matfiles\CHEAT-CPcpvib042Day3.mat
evaluating the subject : cpvib042
no peaks found
no peaks found
evaluated the kindData for sub : cpvib042
evaluating the subject : cpvib042
bad first peak
evaluated the kindData for sub : cpvib042
Successfully loaded trials data.


In [13]:
all_df = all_df.reset_index(drop=True)
num_points = 150  # Number of points for time normalization

conditions = ['Reaching'] #only reaching trials
durations = all_df['Duration'].unique() #should be 4 durations
affected_type_arms = all_df['Affected'].unique()

print(f'Conditions : {conditions} \n Durations : {durations} \n Arms Type : {affected_type_arms}')

Conditions : ['Reaching'] 
 Durations : [750 900 625 500] 
 Arms Type : ['Less Affected' 'More Affected']


In [15]:
pvar_results = []
subjects = all_df['subject'].unique()
print(subjects)

['cpvib040' 'cpvib042']


In [25]:
allTrajs.keys()

dict_keys(['cpvib040Day5', 'cpvib040Day2', 'cpvib040Day3', 'cpvib040Day1', 'cpvib040Day4', 'cpvib042Day5', 'cpvib042Day4', 'cpvib042Day2', 'cpvib042Day1'])

In [35]:
for subject in subjects:
    subject_df = all_df[all_df['subject'] == subject]
    days = subject_df['day'].unique()
    
    for day in days:
        day_df = subject_df[subject_df['day'] == day].reset_index(drop=True)
        key = f"{subject}{day}"
        print(f'Key : {key}')
        if key not in allTrajs:
            print(f"Trajectories for {key} not found. Skipping.")
            continue 
        
        traj_list = allTrajs[key]
        
        if len(traj_list) != len(day_df):
            print(f"Mismatch in number of trajectories for {key}. Expected {len(day_df)}, got {len(traj_list)}. Skipping.")
            continue
        
        day_df['trajData'] = traj_list
        
        for condition in conditions:
            for arm in affected_type_arms:
                for duration in durations:
                    block_df = day_df[
                        (day_df['Condition'] == condition) &
                        (day_df['Affected'] == arm) &
                        (day_df['Duration'] == duration)
                    ]
                    
                    # if len(block_df) < 2:
                    #     print(f"Not enough trials for Subject: {subject}, Day: {day}, "
                    #           f"Condition: {condition}, Arm: {arm}, Duration: {duration}. Skipping.")
                    #     continue 
                    
                    norm_traj_x = []
                    norm_traj_y = []
                    
                    for idx, trial in block_df.iterrows():
                        trajData = trial['trajData']
                        HandX = np.array(trajData.get('HandX_filt', []))
                        HandY = np.array(trajData.get('HandY_filt', []))
                        
                        if len(HandX) == 0 or len(HandY) == 0:
                            print(f"Empty trajectory for trial index {idx}. Skipping.")
                            continue
                        
                        if np.isnan(HandX).any() or np.isnan(HandY).any():
                            print(f"NaN values found in trajectory for trial index {idx}. Skipping.")
                            continue
                        
                        RT = trial['RT']
                        CT = trial['CT']
                        
                        if pd.isna(RT) or pd.isna(CT):
                            print(f"NaN RT or CT for trial index {idx}. Skipping.")
                            continue 
                        
                        RT = int(RT)
                        CT = int(CT)
                        
                        if RT < 0 or CT > len(HandX) or RT >= CT:
                            print(f"Invalid RT ({RT}) or CT ({CT}) for trial index {idx}. Skipping.")
                            continue 
                        # Align the trajectory from movement onset (RT) to movement end (CT)
                        HandX_aligned = HandX[RT:CT]
                        HandY_aligned = HandY[RT:CT]
                        
                        if len(HandX_aligned) < 10:
                            print(f"Short trajectory for trial index {idx}. Skipping.")
                            continue  # Skip short trajectories
                        
                        time = np.linspace(0, 1, len(HandX_aligned))
                        new_time = np.linspace(0, 1, num_points)
                        
                        try:
                            tck_x = splrep(time, HandX_aligned, s=0)
                            tck_y = splrep(time, HandY_aligned, s=0)
                            HandX_norm = splev(new_time, tck_x)
                            HandY_norm = splev(new_time, tck_y)
                            # HandX_norm = np.interp(new_time, time, HandX_aligned)
                            # HandY_norm = np.interp(new_time, time, HandY_aligned)
                        except Exception as e:
                            print(f"Interpolation error for trial index {idx}: {e}. Skipping.")
                            continue  # Skip if interpolation fails
                        
                        norm_traj_x.append(HandX_norm)
                        norm_traj_y.append(HandY_norm)
                    
                    norm_traj_x = np.array(norm_traj_x)  
                    norm_traj_y = np.array(norm_traj_y)
                    
                    if norm_traj_x.shape[0] < 2:
                        print(f"Not enough valid trials for Pvar calculation in Subject: {subject}, Day: {day}, "
                              f"Condition: {condition}, Arm: {arm}, Duration: {duration}. Skipping.")
                        continue
                    
                    Pvar, mean_traj_x, mean_traj_y = generate_pvar(norm_traj_x, norm_traj_y)
                    pvar_results.append({
                        'Subject': subject,
                        'Day': day,
                        'Condition': condition,
                        'Arm': arm,
                        'Duration': duration,
                        'Pvar': Pvar
                    })

Key : cpvib040Day5
NaN RT or CT for trial index 13. Skipping.
NaN RT or CT for trial index 18. Skipping.
NaN RT or CT for trial index 19. Skipping.
NaN RT or CT for trial index 35. Skipping.
Key : cpvib040Day2
NaN RT or CT for trial index 31. Skipping.
NaN RT or CT for trial index 34. Skipping.
NaN RT or CT for trial index 35. Skipping.
NaN RT or CT for trial index 36. Skipping.
NaN RT or CT for trial index 37. Skipping.
NaN RT or CT for trial index 62. Skipping.
NaN RT or CT for trial index 64. Skipping.
Key : cpvib040Day3
NaN RT or CT for trial index 40. Skipping.
NaN RT or CT for trial index 45. Skipping.
NaN RT or CT for trial index 65. Skipping.
NaN RT or CT for trial index 68. Skipping.
Key : cpvib040Day1
NaN RT or CT for trial index 5. Skipping.
NaN RT or CT for trial index 22. Skipping.
NaN RT or CT for trial index 27. Skipping.
NaN RT or CT for trial index 30. Skipping.
NaN RT or CT for trial index 32. Skipping.
NaN RT or CT for trial index 33. Skipping.
NaN RT or CT for trial

In [29]:
def generate_pvar(norm_traj_x, norm_traj_y):
    """
    Computing Pvar as the sum of standard deviations across all data points as per vanthiels
    """
    mean_traj_x = np.mean(norm_traj_x, axis=0)
    mean_traj_y = np.mean(norm_traj_y, axis=0)
    
    deviations_x = norm_traj_x - mean_traj_x
    deviations_y = norm_traj_y - mean_traj_y
    
    std_dev_x = np.std(deviations_x, axis=0)
    std_dev_y = np.std(deviations_y, axis=0)
    
    Pvar = np.sum(std_dev_x + std_dev_y)
    
    return Pvar,mean_traj_x, mean_traj_y
    

In [31]:
def plot_trials_and_mean(subject, condition, arm, duration, norm_traj_x, norm_traj_y, mean_traj_x, mean_traj_y, results_dir):
    plt.figure(figsize=(10, 8))
    
    for i in range(norm_traj_x.shape[0]):
        plt.plot(norm_traj_x[i], norm_traj_y[i], alpha=0.3, color='blue')
    
    plt.plot(mean_traj_x, mean_traj_y, color='red', linewidth=2, label='Mean Trajectory')
    
    plt.title(f"Subject: {subject} - Condition: {condition}, Arm: {arm}, Duration: {duration}")
    plt.xlabel("Hand X Position")
    plt.ylabel("Hand Y Position")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plot_filename = f"{subject}_{condition}_{arm}_{duration}_trials_mean.png"
    plot_path = os.path.join(results_dir, plot_filename)
    plt.savefig(plot_path)
    plt.close()
    
    print(f"Plot saved for Subject: {subject}, Condition: {condition}, Arm: {arm}, Duration: {duration} at {plot_path}")


In [39]:
plot_trials_and_mean(
    subject=subject,
    condition=condition,
    arm=arm,
    duration=duration,
    norm_traj_x=norm_traj_x,
    norm_traj_y=norm_traj_y,
    mean_traj_x=mean_traj_x,
    mean_traj_y=mean_traj_y,
    results_dir=RESULTS_DIR
)


Plot saved for Subject: cpvib042, Condition: Reaching, Arm: More Affected, Duration: 500 at C:\Users\LibraryUser\Downloads\Fall2024\BrainAndAction\CP\CP\results_final_oct10\cpvib042_Reaching_More Affected_500_trials_mean.png
