In [1]:
import pandas as pd
import numpy as np
import pathlib as pl
import datetime as dt

home = pl.Path(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data')
p1 = pl.Path(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data\Participant 1_2022-10-27_09.07.37')
p2 = pl.Path(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data\Partcipant 59_2022-11-21_10.37.29')
time_stamps = pd.read_excel(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data\Timesheets (1).xlsx')

In [2]:
def add_seconds_to_time(time_str):
    '''This function adds seconds to the time string for incase the time string is missing seconds'''
    if len(time_str) == 5:
        time_str += ":00"
    return time_str

In [None]:
time_stamps.iloc[:, 1:] = time_stamps.iloc[:, 1:].applymap(lambda x: add_seconds_to_time(x) if isinstance(x, str) else x)
time_stamps.columns = [i.strip() for i in time_stamps.columns]

In [None]:
time_stamps.head().to_markdown()

In [3]:
# Some instances of 'Participant' are misspelled as 'Partcipant' in the file names. This is a quick fix to correct this.
# Define the directory path where you want to replace the string
directory = pl.Path(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data')

# Define the old string and the new string
old_str = 'Partcipant'
new_str = 'Participant'

# Walk through all directories and files within the specified directory
for path in directory.glob('**/*'):
    if old_str in path.name:
        # Replace the old string with the new string in the file or directory name
        new_name = path.name.replace(old_str, new_str)
        if new_name != path.name:
            # If the name has changed, rename the file or directory
            path.rename(path.with_name(new_name))
            print(f'Renamed {path.name} to {new_name}')


In [4]:
#This imports the raw data from the finometer and then calculates the average of each measure over the selected time period
# the default time period is 1 minute, but if you want to change this you can change the interval variable (e.g. interval = '30s' for 30 seconds)

def import_finometer_intervals(folder_path, interval = '1T', save_csv=False):
    '''This function imports the finometer data and then calculates the average of each measure over the selected time period'''

    if not isinstance(folder_path, pl.Path):#check if folder_path is a pathlib.Path object
        raise TypeError('folder_path must be a pathlib.Path object')
    elif not folder_path.exists(): #  and if it exists
        raise ValueError('folder_path does not exist')
    elif not folder_path.is_dir(): #  and is a directory 
        raise ValueError('folder_path is not a directory')
    elif len([x for x in folder_path.iterdir() if x.suffix == '.txt']) != 1:  # and if there is only one .txt file in the folder
        raise ValueError('''More than one .txt file found in folder, please ensure only one .txt file is present in the folder''')
    else:
        file = [x for x in folder_path.iterdir() if x.suffix == '.txt'][0]
    # read in the data from the .txt file
        df = pd.read_csv(
            file,
            sep=';',
            header=0,
            skiprows=8, # skip the first 8 rows
            skipfooter=1,
            engine='python',
            )

        # Drop the unnamed column at position 13
        df = df.drop(df.columns[13], axis=1)

        # set the 'Time (s)' column to a datetime object
        df['Time (s)'] = pd.to_datetime(df['Time (s)'], format='%H:%M:%S.%f')
        # drop the year from the datetime object
        # df['Time (s)'] = df['Time (s)'].dt.strftime('%H:%M:%S.%f')

        # set the 'Time (s)' column as the index
        df = df.set_index('Time (s)')

        # create the average of each measure over the selected time period
        df_munutes = df.resample(f'{interval}').mean()

        if save_csv: #if you want to save the csv file (which may be useful if you want to use the data in other ways)
            df_munutes.to_csv(folder_path / f"imported data for {file.stem.split('_')[0]}.csv", index=True)
            print(f"CSV saved for {file.stem.split('_')[0]}")



        return df_munutes


In [5]:
a = import_finometer_intervals(p1, interval = '1T')

  df_munutes = df.resample(f'{interval}').mean()


In [6]:
a.head().to_markdown()

'| Time (s)            |   Systolic Pressure (mmHg) |   Diastolic Pressure (mmHg) |   Mean Pressure (mmHg) |   Heart rate (bpm) |   Stroke Volume (ml) |   Left Ventricular Ejection Time (ms) |   Pulse Interval (ms) |   Maximum Slope (mmHg/s) |   Cardiac Output (l/min) |   Total Peripheral Resistance Medical Unit (mmHg.min/l) |   Total Peripheral Resistance CGS (dyn.s/cm5) |\n|:--------------------|---------------------------:|----------------------------:|-----------------------:|-------------------:|---------------------:|--------------------------------------:|----------------------:|-------------------------:|-------------------------:|--------------------------------------------------------:|----------------------------------------------:|\n| 1900-01-01 09:07:00 |                    74.4815 |                     51.6667 |                 62     |            93.5556 |              16.0926 |                               135.926 |               642.778 |                  478.407 |   

In [None]:
# This cell creates the 1 minute intervals for each participant
# it saves a file for each person in their folder
[import_finometer_intervals(x, save_csv=True) for x in home.iterdir() if x.is_dir()]

In [20]:
def import_finometer_protocol_times(times_file_path, seconds = False):
    '''This function imports the protocol times from the .csv file and returns a dataframe with the protocol times for each participant
    
    file_path: pathlib.Path object
        The path to the .csv file containing the protocol times
    save_csv: boolean (optional)
        If True, the imported data will be saved as a .csv file in the same folder as the .csv file, this is not alway needed and should be used sparingly'''


    def add_seconds_to_time(time_str):
        '''This function adds seconds to the time string for incase the time string is missing seconds'''
        if len(time_str) == 5:
            time_str += ":00"
        return time_str
    
    def remove_seconds_from_time(time_str):
        '''This function removes seconds from the time string'''
        return time_str[:-3]
    
   

    if not isinstance(times_file_path, pl.Path):#check if folder_path is a pathlib.Path object
        raise TypeError('file_path must be a pathlib.Path object')
    elif not times_file_path.exists(): #  and if it exists
        raise ValueError('file_path does not exist')
    elif not times_file_path.is_file(): #  and is a file 
        raise ValueError('file_path is not a file')
    elif times_file_path.suffix != '.csv': #  and is a csv file
        raise ValueError('file_path is not an csv file')
    else:
        df = pd.read_csv(times_file_path, delimiter= ',')
        df.columns = [col.strip() for col in df.columns]
        cols_to_keep = ['Participant ID', 'Start of Baseline', 'End of Baseline', 'Start of Task 1', 'End of Task 1', 'Start of Recovery Period', 'End of Recovery Period']
        df = df[cols_to_keep].applymap(lambda x: str(x).strip('"') if isinstance(x, str) else x)

        if seconds:
            df = df.applymap(lambda x: add_seconds_to_time(x) if isinstance(x, str) else x)
        else:
            df = df.applymap(lambda x: remove_seconds_from_time(x) if isinstance(x, str) else x)




        return df

In [None]:
def import_finometer_demographics(file_path):
    '''
    A function to import the demographic data from a smartmedical/finopress
    finometer .txt file
    
    file_path =  should be a path to a .txt file, this can be string but should preferably be a pathlib object to allow for use on multiple operating
    systems
    '''

    if not isinstance(file_path, pl.PurePath):
        try:
            file_path = pl.Path(file_path)
        except TypeError:
            raise TypeError("""The 'file_path' must be a path object or a string that can be converted to a path object. Remember 'raw strings on windows devices""")
    
    try:
        with open(file_path, 'r') as file:
            data=[]
            for line in file:
                if line.startswith('Identification'):
                    data.append(line.strip().split(';'))
                    next_line = next(file).strip()
                    if next_line:
                        data.append(next_line.split(';'))
                    break
            
            if len(data) < 2:
                raise ValueError("File doesn't contain expected data")
            

            df = pd.DataFrame(data, columns= ['Participant ID', 'ID 2', 'Age (yrs)', 'Height (cm)', 'Weight (kg)', 'Gender', 'Procedure', 'Model number'])
            df = df.drop(0)
            df = df.drop('ID 2', axis = 1)


            return(df)      
                        

    except (TypeError, ValueError, FileNotFoundError) as err:
        print(str(file_path), type(err), err)
        return None


In [None]:
df = pd.concat([import_finometer_demographics(f) for f in p1.parent.glob("**/*") if f.suffix == '.txt'], axis = 0, ignore_index=True)


In [54]:
def import_protocol_averages_feb_23(f, t1 = None, t2 = None, t3 = None, t4 = None, t5 = None, t6= None):
    '''A function that imports the averaged finomter files (which have already been processed from the raw data)
    to produce averages for each section of the experiment protocol'''

    
    if not isinstance(f, pl.Path):#check if folder_path is a pathlib.Path object
        raise TypeError('file_path must be a pathlib.Path object')
    elif not f.exists(): #  and if it exists
        raise ValueError('file_path does not exist')
    else:
    # check if the file is a csv or xlsx file
        if f.suffix == '.csv':
            try:
                intervals = pd.read_csv(f, index_col=0, parse_dates=True)
            except pd.errors.EmptyDataError:
                print(f"File {f} is empty")# might need to fix this but its a start
        elif f.suffix == '.xlsx':
            try:
                intervals = pd.read_excel(f, index_col=0, parse_dates=True)
            except pd.errors.EmptyDataError:
                print(f"File {f} is empty")# might need to fix this but its a start
        else:
            raise ValueError("File must be a Path object for a .csv or .xlsx file")


    intervals.index = pd.to_datetime(intervals.index).strftime('%H:%M:%S')
    intervals = intervals.replace('--', np.nan) #getting rid of blank values (converting them to 'nan' instead of just '--')
    intervals = intervals.dropna(how='all') #getting rid of rows that are completely blank

    def create_protocol_part_df(frame, start, end, tag, filename):
        '''A function that creates a dataframe representing a time period within a particular experimental protocol from a given DataFrame.
        Takes a DataFrame, start and end times for the portion of the experimental protocol.
        Returns a DataFrame with the mean values of the given columns during that portion of the study.'''
        
        period = frame.loc[start:end].dropna().T
        period['Average'] = period.mean(axis=1)
        b = pd.DataFrame(period['Average']).T
        b.columns = [f'{i}_{tag}' for i in b.columns]
        b['Participant ID'] = f'{filename}'
        # b = b.set_index('Participant ID')
        
        return b

    #creating the baseline dataframe
    baseline = create_protocol_part_df(intervals, t1, t2, 'bl', f.parent.stem.split('_')[0])

    # #creating the baseline dataframe
    # baseline = df.loc[t1:t2].dropna().T
    # baseline['Average'] = baseline.mean(axis = 1)
    # b =  pd.DataFrame(baseline['Average']).T
    # b.columns = [f'{i}_bl' for i in b.columns]
    # b['ID'] = f'{f.name}'


    # #creating the instruction dataframe
    # inst = df.loc[t3:t4].dropna().T
    # inst['Average'] = inst.mean(axis = 1)
    # ins =  pd.DataFrame(inst['Average']).T
    # ins.columns = [f'{i}_ins' for i in ins.columns]
    # ins['ID'] = f'{f.name}'
    
    # #creating the task dataframe
    # task = df.loc[t5:t6].dropna().T
    # task['Average'] = task.mean(axis = 1)
    # t =  pd.DataFrame(task['Average']).T
    # t.columns = [f'{i}_task' for i in t.columns]
    # t['ID'] = f'{f.name}'

    # #creating the recovery dataframe
    # rec = df.loc[t7:t8].dropna().T
    # rec['Average'] = rec.mean(axis = 1)
    # r =  pd.DataFrame(rec['Average']).T
    # r.columns = [f'{i}_rec' for i in r.columns]
    # r['ID'] = f'{f.name}'
    

    # # creating a dataframe from the smaller dataframes
    # data_merge = reduce(lambda left, right:     # Merge three pandas DataFrames
    #                  pd.merge(left , right,
    #                           on = ["ID"],
    #                           how = "outer"),
    #                  [b,ins,t,r])
    # data_merge.set_index('ID', inplace=True) # setting the 'ID' column as the index of the new merged dataframe
    

    
    return baseline # passing the new dataframe back to the user

In [55]:
d = import_protocol_averages_feb_23(pl.Path(r'C:\Users\kevin.omalley\OneDrive - University of Limerick\Documents\GitHub\fino2py\data\Participant 1_2022-10-27_09.07.37\imported data for Participant 1.csv'))

In [56]:
d.head()

Unnamed: 0,Systolic Pressure (mmHg)_bl,Diastolic Pressure (mmHg)_bl,Mean Pressure (mmHg)_bl,Heart rate (bpm)_bl,Stroke Volume (ml)_bl,Left Ventricular Ejection Time (ms)_bl,Pulse Interval (ms)_bl,Maximum Slope (mmHg/s)_bl,Cardiac Output (l/min)_bl,Total Peripheral Resistance Medical Unit (mmHg.min/l)_bl,Total Peripheral Resistance CGS (dyn.s/cm5)_bl,Participant ID
Average,199.787891,155.626108,173.729357,89.63786,17.403839,242.961308,680.683736,1110.357707,1.559994,7.332134,9776.167137,Participant 1


In [None]:
#This is the first finometer function, but the use case has somehwat changed, I'm going to keep it here for now, but it will be removed in the future
#The function needs to be about reshaping the data, which probably needs to be applied to the data after `import_finomteter_intervals` has been run
 

def import_finometer_txt(folder_path, save_csv=False, zero_time=False):
    """
    Imports raw data from the finopress finometer .txt file and returns a pandas dataframe, resahped from tall to wide format.
    The function will only work if there is only one .txt file in the folder.
    i: pathlib.Path object
    save_csv: boolean, if True, will save a csv file of the imported data into the participant's folder
    zero_time: boolean, if True, will convert the time column to a timedelta object and set the first timepoint to 0
    """

   
    if not isinstance(folder_path, pl.Path):#check if folder_path is a pathlib.Path object
        raise TypeError('folder_path must be a pathlib.Path object')
    elif not folder_path.exists(): #  and if it exists
        raise ValueError('folder_path does not exist')
    elif not folder_path.is_dir(): #  and is a directory 
        raise ValueError('folder_path is not a directory')
    elif len([x for x in folder_path.iterdir() if x.suffix == '.txt']) != 1:  # and if there is only one .txt file in the folder
        raise ValueError('''More than one .txt file found in folder, please ensure only one .txt file is present in the folder''')
    else:
        file = [x for x in folder_path.iterdir() if x.suffix == '.txt'][0]
    # read in the data from the .txt file
        df = pd.read_csv(
            file,
            sep=';',
            header=0,
            skiprows=8, # skip the first 8 rows
            skipfooter=1,
            engine='python',
            )

        # Drop the unnamed column at position 13
        df = df.drop(df.columns[13], axis=1)



        # set the 'Time (s)' column to a datetime object        
        df['Time (s)'] = pd.to_datetime(df['Time (s)'], format='%H:%M:%S.%f')
        # drop the year from the datetime object
        df['Time (s)'] = df['Time (s)'].dt.strftime('%H:%M:%S.%f')

        # check if the user wants to set the first timepoint to 0
        if zero_time:
            min_time = df['Time (s)'].min()
            df['Time Since 0'] = df['Time (s)'] - min_time
            df['Time Since 0'] = df['Time Since 0'].dt.total_seconds()
            df['']
        else:
            df = df.set_index('Time (s)')

        alligned = df.groupby(df.index)[df.columns[1:]].max().stack().to_frame().T
        alligned.index = [file.stem.split('_')[0]]

        if save_csv:
            alligned.to_csv(folder_path / f"imported data for {file.stem.split('_')[0]}.csv")
            print(f"CSV saved for {file.stem.split('_')[0]}")



        return alligned