In [1]:
"""
Created on Tuesday 1 May 2023
Author: ZAW
"""
#import libraries
import numpy as np
import pandas as pd
import os
from scipy.optimize import curve_fit
import warnings
warnings.simplefilter("ignore")

### Load the subject and stiumuls repect to their speed
path = os.getcwd()
dirname = os.path.dirname(path)
# for HC
data_file = 'data/four_degHC.csv'
# # for PD
#data_file = 'data/PD/one_deg.csv'

data_path = os.path.join(dirname,data_file)
# data = pd.read_csv(data_path)
data= pd.read_csv(data_path)

#import CSV data for stimulus
path_sti = os.getcwd()
dirname_sti = os.path.dirname(path_sti)
data_file_sti = 'data/4_degSti.xlsx'
data_path_sti = os.path.join(dirname_sti,data_file_sti)
rawdata = pd.read_excel(data_path_sti)
data_st = np.array(rawdata[1:],dtype=np.float)
time = data_st[:,1]
position = data_st[:,0]

FirstTrialHC = []

# List of subject codes (e.g. PD001, PD002, etc.)
subject_codes = list(set([col[3:] for col in data.columns if col.startswith('x1_')]))

for subject_code in subject_codes:
    ### Load the subject and stiumuls repect to their speed
    path = os.getcwd()
    dirname = os.path.dirname(path)
    # for HC
    data_file = 'data/four_degHC.csv'
    # # for PD
    #data_file = 'data/PD/one_deg.csv'

    data_path = os.path.join(dirname,data_file)
    # data = pd.read_csv(data_path)
    data= pd.read_csv(data_path)
    print('File '+ str(subject_code)+ ' is started.') 
    x_column = f'x1_{subject_code}'
    y_column = f'y1_{subject_code}'

    # Check if both columns exist in the DataFrame before accessing them
    if x_column in data.columns and y_column in data.columns:
        data_hc = data[[x_column, y_column]]
    else:
        print(f"Columns for subject code {subject_code} not found. Skipping...")
        continue

    y_data = data_hc[x_column]
    x_data = data_hc[y_column]

    # data_hc = data[['x1_PD001','y1_PD001']] # change the ID each subject code(e.g PD001 or PD003)
    # y_data = data_hc[data_hc.columns[0]]
    # x_data = data_hc[data_hc.columns[1]]


    # Trigonometric Functions
    # Trigonometric functions

    # Define the function
    def func(x, a, b, c):  #Position as a function of time.
        return a*(2/np.pi)*np.arcsin(np.sin(np.pi*(b*x+c)))

    #initial guesses( This has to change depend on stimulus speed)
    #for 1 degree per second: [10, 0.05, 0]
    #for 2 degrees per second: [10, 0.1, 0]
    #for 4 degrees per second: [10, 0.2, 0]
    #for 6 degrees per second: [10, 0.3, 0]
    #for 8 degrees per second: [10, 0.4, 0]
    InitialGuess = [10, 0.2, 0] # for one degree

    # Perform curve fitting
    popt, pcov = curve_fit(func,position,time, p0=InitialGuess)

    # Extract the optimal values of a, b, and c
    a, b, c = popt
    # print("a =", a)
    # print("b =", b)
    # print("c =", c)

    # Fit the curve
    fit_time = func(y_data,a,b,c)
    # plt.plot(y_data,fit_time)
    # plt.show()

    # Find the residual
    # Different(aka residual)
    diff = x_data - fit_time

    dt_array = np.array(diff)
    dt_array = pd.DataFrame(dt_array,columns=['diff'])
    window_size = 2
    dt_array['Moving_Average'] = dt_array['diff'].rolling(window=window_size).mean()
    #plt.plot(y_data,dt_array['Moving_Average'])
    # create dataframe
    data = {'Time':y_data,'POS':dt_array['Moving_Average']}
    df = pd.DataFrame(data)

    # The whole dataset
    # Plot specific range
    x_start = 10.01
    x_end = 90.00

    # Filter the data points within the disired range using boolean indexing
    mask = (df['Time'] >= x_start) & (df['Time'] <= x_end)
    x_data_range = df.loc[mask]

    data = {'Time':x_data_range['Time'],'POS':x_data_range['POS']}
    df_test = pd.DataFrame(data)

    # Determine the integration window
    def check_sign(x):
        if x > 0:
            return "Positive"
        else:
            return "Negative"
    df_test['Sign'] = df_test['POS'].apply(check_sign)

    # Condition window width 
    def calculate_start_end(row):
        global last_positive, last_negative
        if row['Sign'] == 'Positive':
            if row.name == 0 or df_test.loc[row.name - 1, 'Sign'] != 'Positive':
                last_positive = row['Time']
                return last_positive, '', '', ''
            elif row.name < len(df_test) - 1 and df_test.loc[row.name + 1, 'Sign'] != 'Positive':
                pos_end = row['Time']
                last_positive = ''
                return '', pos_end, '', ''
            else:
                return '', '', '', ''
        elif row['Sign'] == 'Negative':
            if row.name == 0 or df_test.loc[row.name - 1, 'Sign'] != 'Negative':
                last_negative = row['Time']
                return '', '', last_negative, ''
            elif row.name < len(df_test) - 1 and df_test.loc[row.name + 1, 'Sign'] != 'Negative':
                last_negative = row['Time']
                return '', '', '', last_negative
            else:
                return '', '', '', ''
        else:
            return '', '', '', ''

    # Initialize the last positive and negative values to empty strings 
    last_positive = ''
    last_negative = ''

    # Reset the index of the DataFrame
    df_test = df_test.reset_index(drop=True)

    # Apply the custom function to create new columns
    df_test['PosTim_Start'], df_test['PosTim_End'], df_test['NegTim_Start'], df_test['NegTim_End'] = zip(*df_test.apply(calculate_start_end, axis=1))

    # Fill the empty cells with an empty string
    df_test['PosTim_Start'] = df_test['PosTim_Start'].fillna('') 
    df_test['PosTim_End'] = df_test['PosTim_End'].fillna('')
    df_test['NegTim_Start'] = df_test['NegTim_Start'].fillna('')
    df_test['NegTim_End'] = df_test['NegTim_End'].fillna('')

    # Print the resulting dataframe 
    #print(df_test)


    # Positive time start trimming
    # Select the non-empty values in the 'Pos_Start' column
    pos_start_values = df_test.loc[df_test['PosTim_Start'] != '', 'PosTim_Start'].values

    # Round the values in the 'Pos_Start' column to two decimal places
    # rounded_pos_start_values = []
    # for value in pos_start_values:
    #     rounded_pos_start_values.append(round(value - 0.003, 2))

    # Convert the list of rounded values back to a NumPy array
    postim_start_values = np.array(pos_start_values)

    # Print the non-empty values
    #print(postim_start_values)


    # Positive time end triming
    # Select the non-empty values in the 'Pos_End' column
    pos_end_values = df_test.loc[df_test['PosTim_End'] != '', 'PosTim_End'].values

    # Round the values in the 'Pos_End' column to two decimal places
    # rounded_pos_end_values = []
    # for value in pos_end_values:
    #     rounded_pos_end_values.append(round(value - 0.02, 2))

    # Convert the list of rounded values back to a NumPy array
    postim_end_values = np.array(pos_end_values)

    # Print the non-empty values
    #print(postim_end_values)


    # Reload the raw data for mapping
    data = data_hc.rename(columns={data_hc.columns.values[0]:"Position",
                                data_hc.columns.values[1]:'Time'})


    # Map the start positive time to POS in raw data
    # Create a dictionary from the mapping list
    mapping_list = postim_start_values

    # Map the values to the 'POS' column in raw
    mapped_posSt = data.loc[data['Position'].isin(mapping_list),'Time']


    # Map the end positive time to POS in raw data
    # Create a dictionary from the mapping list
    mapping_list = postim_end_values

    # Map the values to the 'POS' column in raw
    mapped_posEnd = data.loc[data['Position'].isin(mapping_list),'Time']


    # Velocity param positive dataframe
    # # column miss match
    # Check lengths and truncate longer column 
    if len(postim_start_values) > len(mapped_posSt): 
        postim_start_values = postim_start_values[:len(mapped_posSt)] 
    elif len(mapped_posSt) > len(postim_start_values):
        mapped_posSt = mapped_posSt[:len(postim_start_values)]      

    # Now columns have equal length 
    data_param = {'positive_x1':postim_start_values,'positive_y1':mapped_posSt}             
    param_vel_start = pd.DataFrame(data_param)

    data_param= {'positive_x2':postim_end_values,'positive_y2':mapped_posEnd}
    param_vel_end = pd.DataFrame(data_param)

    # Reset the indices of the DataFrames
    param_vel_start = param_vel_start.reset_index(drop=True)
    param_vel_end = param_vel_end.reset_index(drop=True)

    # Concatenate the DataFrames horizontally
    param_vel_positive_final = pd.concat([param_vel_start,param_vel_end],axis=1)
    #param_vel_positive_final['Del_X'] = param_vel_positive_final['positive_x2'] - param_vel_positive_final['positive_x1']

    param_vel_positive_final['Del_X'] = (param_vel_positive_final['positive_x2'] - 
                                        param_vel_positive_final['positive_x1']).where(~param_vel_positive_final['positive_x2'].isnull(), np.nan)



    # # Remove short duration SWJ from 50 ms to 400 ms
    param_vel_positive_final = param_vel_positive_final[(param_vel_positive_final['Del_X'] > 0.4) & 
                                                        (param_vel_positive_final['Del_X'] < 2.00)]


    # iterate over the rows of the dataframe
    prev_positive_x2 = None
    for i, row in param_vel_positive_final.iterrows():
        # check if this is not the first row
        if prev_positive_x2 is not None:
            # check if positive_x1 is lower than the previous positive_x2
            if row['positive_x1'] < prev_positive_x2:
                # remove this row from the dataframe
                param_vel_positive_final = param_vel_positive_final.drop(i)
            else:
                # update prev_positive_x2 if positive_x1 is greater than or equal to prev_positive_x2
                prev_positive_x2 = row['positive_x2']
        else:
            # initialize prev_positive_x2 with the first row value
            prev_positive_x2 = row['positive_x2']


    # Velocity for positive pieak calculation
    # Calculate the difference between y2 and y1 divided by the difference between x2 and x1
    param_vel_positive_final['slope'] = (param_vel_positive_final['positive_y2'] - param_vel_positive_final['positive_y1']) / (param_vel_positive_final['positive_x2'] - param_vel_positive_final['positive_x1'])

    # abs
    param_vel_positive_final['slope'] = abs(param_vel_positive_final['slope'])

    # Reindexing
    param_vel_positive_final = param_vel_positive_final.reset_index(drop=True)

    # Remove unwant values
    #param_gain_positive_final.loc[param_gain_positive_final['slope'] > 1.09, 'slope'] = np.nan  

    # Calculate the average slope
    average_positive_slope = abs(param_vel_positive_final['slope'].mean())

    # Print the average slope
    #print(average_positive_slope)


    # Negative time start trimming
    # Select the non-empty values in the 'Pos_Start' column
    neg_start_values = df_test.loc[df_test['NegTim_Start'] != '', 'NegTim_Start'].values

    # Round the values in the 'Pos_Start' column to two decimal places
    rounded_neg_start_values = []
    for value in neg_start_values:
        rounded_neg_start_values.append(round(value - 0.003, 2))

    # Convert the list of rounded values back to a NumPy array
    negtim_start_values = np.array(rounded_neg_start_values)

    # Print the non-empty values
    #print(negtim_start_values)


    # Negative Time End Triming
    # Select the non-empty values in the 'Pos_Start' column
    neg_end_values = df_test.loc[df_test['NegTim_End'] != '', 'NegTim_End'].values

    # Round the values in the 'Pos_Start' column to two decimal places
    # rounded_neg_end_values = []
    # for value in neg_end_values:
    #     rounded_neg_end_values.append(round(value - 0.02, 2))

    # Convert the list of rounded values back to a NumPy array
    negtim_end_values = np.array(neg_end_values)


    # Map the start negative time to POS in raw data
    # Create a dictionary from the mapping list
    mapping_list = negtim_start_values

    # Map the values to the 'POS' column in raw
    mapped_negSt = data.loc[data['Position'].isin(mapping_list),'Time']



    # Ma the end of negative time to pos in raw data
    # Create a dictionary from the mapping list
    mapping_list = negtim_end_values

    # Map the values to the 'POS' column in raw
    mapped_negEnd = data.loc[data['Position'].isin(mapping_list),'Time']


    # Velocity param negative datafrmae
    # column miss match
    # Check lengths and truncate longer column 
    if len(negtim_start_values) > len(mapped_negSt): 
        negtim_start_values = negtim_start_values[:len(mapped_negSt)] 
    elif len(mapped_negSt) > len(negtim_start_values):
        mapped_negSt = mapped_negSt[:len(negtim_start_values)]  


    data_param = {'negative_x1':negtim_start_values,'negative_y1':mapped_negSt}             
    param_vel_start = pd.DataFrame(data_param)
    data_param= {'negative_x2':negtim_end_values,'negative_y2':mapped_negEnd}
    param_vel_end = pd.DataFrame(data_param)

    # Reset the indices of the DataFrames
    param_vel_start = param_vel_start.reset_index(drop=True)
    param_vel_end = param_vel_end.reset_index(drop=True)

    # Concatenate the DataFrames horizontally
    param_vel_negative_final = pd.concat([param_vel_start,param_vel_end],axis=1)
    param_vel_negative_final['Del_X'] = param_vel_negative_final['negative_x2'] - param_vel_negative_final['negative_x1']
    #param_vel_negative_final = param_vel_negative_final.fillna(method='ffill')
    #param_vel_negative_final

    # Remove short duration
    # param_vel_negative_final = param_vel_negative_final[(param_vel_negative_final['Del_X'] > 0.3) & 
    #                                                     (param_vel_negative_final['Del_X'] < 2.07)]
    param_vel_negative_final = param_vel_negative_final[param_vel_negative_final['Del_X'] < 2.07]
    #                                                     
    param_vel_negative_final['Del_X'] = (param_vel_negative_final['negative_x2'] - 
                                        param_vel_negative_final['negative_x1']).where(~param_vel_negative_final['negative_x2'].isnull(), np.nan)



    # # Remove short duration SWJ from 50 ms to 400 ms
    param_vel_negative_final = param_vel_negative_final[(param_vel_negative_final['Del_X'] > 0.4) & 
                                                        (param_vel_negative_final['Del_X'] < 2.00)]


    # iterate over the rows of the dataframe
    prev_negative_x2 = None
    for i, row in param_vel_negative_final.iterrows():
        # check if this is not the first row
        if prev_negative_x2 is not None:
            # check if positive_x1 is lower than the previous positive_x2
            if row['negative_x1'] < prev_negative_x2:
                # remove this row from the dataframe
                param_vel_negative_final = param_vel_negative_final.drop(i)
            else:
                # update prev_positive_x2 if positive_x1 is greater than or equal to prev_positive_x2
                prev_megatove_x2 = row['negative_x2']
        else:
            # initialize prev_positive_x2 with the first row value
            prev_negative_x2 = row['negative_x2']

    # Velocity for Negative peak calculation
    # Calculate the difference between y2 and y1 divided by the difference between x2 and x1
    param_vel_negative_final['slope'] = (param_vel_negative_final['negative_y2']
                                        - param_vel_negative_final['negative_y1']) / (param_vel_negative_final['negative_x2'] 
                                        - param_vel_negative_final['negative_x1'])

    # abs
    param_vel_negative_final['slope'] = abs(param_vel_negative_final['slope'])

    param_vel_negative_final['slope'].replace([np.inf, -np.inf], np.nan, inplace=True)

    # Reindexing
    param_vel_negative_final = param_vel_negative_final.reset_index(drop=True)

    # Remove unwant values
    #param_gain_negative_final.loc[(param_gain_negative_final['slope'] < 0.80) |  
    #                              (param_gain_negative_final['slope'] > 1.09), 
    #                              'slope'] = np.nan
    # Calculate the average slope
    average_negative_slope = abs(param_vel_negative_final['slope'].mean())

    # Print the average slope
    #print(average_negative_slope)


    # Total velocity for both positive and negative
    Final_Vel = pd.concat([param_vel_positive_final[['Del_X','slope']],
                        param_vel_negative_final[['Del_X','slope']]], axis=0, join='outer')

    # Counter chck with duration and velocity
    Final_Vel['Checked_velocity'] = Final_Vel['slope'] * Final_Vel['Del_X']
    Vel = Final_Vel['Checked_velocity'].sum()/Final_Vel['Del_X'].sum()

    # Calculate gain
    speed = 4 # stimulus speed
    total_gain = Vel/speed
    #print("The gain for this subject is:",total_gain)
    result = {"SubjectID":subject_code ,"gain":total_gain}
    FirstTrialHC.append(result)
    print('File '+ str(subject_code)+ ' is done.') 

# Create a DataFrame from the list of dictionaries
FirstTrialHC_df = pd.DataFrame(FirstTrialHC)
FirstTrialHC_df .to_csv('FourFirstTrialHC_df .csv',index=False)
#print(FirstTrialHC_df)


FileNotFoundError: [Errno 2] No such file or directory: '/data/two_degHC.csv'