Reads and parses fNIRS data with traditional ML techniques

In [1]:
import pandas as pd
import numpy as np
import sklearn as sk
import os
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVC

In [2]:
BASELINE_START = "baselinestart"
BASELINE_END = "baselineend"
EASY_START = "easystart"
EASY_END = "easyend"
HARD_START = "hardstart"
HARD_END = "hardend"

In [3]:
fnirs_path = os.path.join(os.getcwd(), "data/S902/2015-02-26_11-24-48-120", "fNIRSdata.txt")
marker_path = os.path.join(os.getcwd(), "data/S902/2015-02-26_11-24-48-120", "markers.txt")

First read the files

In [4]:
"""Gets the row blocks for easy and hard tasks
"""
def read_data(fnirs_path, marker_path):
    fnirs_df =  pd.read_csv(fnirs_path, sep='\t', skiprows=range(4), index_col=False)
    marker_df = pd.read_csv(marker_path, sep='\t', skiprows=range(4), index_col=False)
    
    merged_df = pd.merge(fnirs_df, marker_df, on="Matlab_now", how="left")
    
    
    return merged_df

In [5]:
def get_row_blocks(merged_df):
    easy_start_rows = merged_df.index[merged_df.Stimulus_Label == EASY_START].tolist()
    easy_end_rows = merged_df.index[merged_df.Stimulus_Label == EASY_END].tolist()
    hard_start_rows = merged_df.index[merged_df.Stimulus_Label == HARD_START].tolist()
    hard_end_rows = merged_df.index[merged_df.Stimulus_Label == HARD_END].tolist()
    
    easy_rows = list(zip(easy_start_rows, easy_end_rows))
    hard_rows = list(zip(hard_start_rows, hard_end_rows))
    
    return (easy_rows, hard_rows)

In [6]:
"""Return subset of df determined by the indices of the row blocks
"""
def get_subsets(merged_df, row_blocks):
    tables = []
    column_names = ["Matlab_now", "A-DC1", "A-DC2", "A-DC3", "A-DC4", "A-DC5",
                    "A-DC6", "A-DC7", "A-DC8", "B-DC1", "B-DC2", "B-DC3", 
                    "B-DC4", "B-DC5", "B-DC6", "B-DC7", "B-DC8"]
    column_indices = [merged_df.columns.get_loc(c) for c in column_names]
    for row_block in row_blocks:
        df = merged_df.iloc[row_block[0]:row_block[1], column_indices]
        start_time = df.iloc[0]["Matlab_now"]
        df["Matlab_now"] = df["Matlab_now"] - start_time

        tables.append(df)
    return tables

In [7]:
"""Perform linear fit and calculate linear fit coefficients and mean
    :param table: pandas df, subset to examine
    :return: Dictionary of
                key: Column name
                value: 3-tuple (a, b, mean), where y = ax + b
"""
def extract_feature(table):
    x = table["Matlab_now"].values
    cols = table.columns[1:]
    my_dict = {}
    for col in cols:
        y = table[col].values
        z = np.poly1d(np.polyfit(x, y, 1))
        my_tuple = (z[1], z[0], table[col].mean())

        my_dict[col] = my_tuple
    return my_dict

In [8]:
"""runs polyfit on the timeseries data
    :param tables: table of blocks of easy / hard tasks
    :param difficulty: 0 - easy, 1 - hard; labeling process
    
    :return: Dictionary of key: channel
                           value: (gradient, mean, difficulty)
"""
def extract_features(tables, difficulty):
    my_dict = {}
    for table in tables:
        x = table["Matlab_now"].values
        cols = table.columns[1:]
        for col in cols:
            y = table[col].values
            z = np.poly1d(np.polyfit(x, y, 1))
            my_tuple = (z[1], table[col].mean(), difficulty)
            
            my_arr = my_dict.get(col, [])
            my_arr.append(my_tuple)
            my_dict[col] = my_arr
            
    return my_dict
        

In [9]:
"""Extract features from given dataset
    :param data_path: Directory containing the files
    
    :return: gets all the easy and hard features from a given dataset
"""
def get_features_for_dataset(data_path):
    fnirs_path = os.path.join(os.getcwd(), data_path, "fNIRSdata.txt")
    marker_path = os.path.join(os.getcwd(), data_path, "markers.txt")
    merged_df = read_data(fnirs_path, marker_path)
    easy_rows, hard_rows = get_row_blocks(merged_df)
    easy_tables = get_subsets(merged_df, easy_rows)
    hard_tables = get_subsets(merged_df, hard_rows)
    easy_features = extract_features(easy_tables, 0)
    hard_features = extract_features(hard_tables, 1)

    return easy_features, hard_features

In [10]:
"""Be able to merge a number of different datasets
    :param feature_dict_list: List of features to merge together
    
    :return merged dictionary
"""
def merge_features(feature_dict_list):
    main_dict = {}
    for feature_dict in feature_dict_list:
        for key in feature_dict.keys():
            if key in main_dict:
                main_dict[key] = main_dict[key] + feature_dict[key]
            else:
                main_dict[key] = feature_dict[key]
    return main_dict

In [11]:
easy_902, hard_902 = get_features_for_dataset("/Users/sjjin/workspace/hci_lab/data/S902/2015-02-26_11-24-48-120")
easy_903, hard_903 = get_features_for_dataset("/Users/sjjin/workspace/hci_lab/data/S903/2015-02-27_13-20-42-120")
easy_904, hard_904 = get_features_for_dataset("/Users/sjjin/workspace/hci_lab/data/S904/2015-02-27_15-30-27-120")
easy_905, hard_905 = get_features_for_dataset("/Users/sjjin/workspace/hci_lab/data/S905/2015-03-02_13-14-35-120")
easy_906, hard_906 = get_features_for_dataset("/Users/sjjin/workspace/hci_lab/data/S906/2015-03-05_11-17-38-120")

train_set_easy = merge_features([easy_902, easy_903, easy_904, easy_905])
train_set_hard = merge_features([hard_902, hard_903, hard_904, hard_905])
test_set_easy = easy_906
test_set_hard = hard_906

In [18]:
easy_902

{'A-DC1': [(-1.9842354651893872, 1851.1723163841807, 0),
  (-0.385560576605165, 1839.039660056657, 0),
  (-1.4744157008315235, 1845.822033898305, 0),
  (-1.0931883410503638, 1817.6694915254238, 0),
  (-1.7961728065377498, 1811.0593220338983, 0),
  (-0.6437460444081015, 1786.8079096045199, 0),
  (-2.1574255803972515, 1698.6468926553673, 0),
  (-2.555757838543809, 1690.6581920903955, 0),
  (-1.2804169098528093, 1688.7457627118645, 0),
  (-1.637228643331756, 1649.8813559322034, 0),
  (-2.0496615292282887, 1633.542372881356, 0)],
 'A-DC2': [(-0.5301678979574658, 466.3539548022602, 0),
  (0.03565681988451774, 466.92322946175625, 0),
  (-0.37802892178750386, 473.23615819209033, 0),
  (-0.3478285853894544, 463.9810734463278, 0),
  (-0.5880481489664198, 463.2426553672317, 0),
  (-0.07784988550674674, 455.79322033898245, 0),
  (-0.563777045730348, 435.1573446327683, 0),
  (-0.6162256908126753, 433.54491525423697, 0),
  (-0.48424899585849107, 432.4709039548023, 0),
  (-0.3801447364315316, 421.59

In [12]:
train_set_hard

{'A-DC1': [(-1.5119116861200568, 1845.1016949152543, 1),
  (-1.5948071471089478, 1853.7570621468926, 1),
  (-3.1064212056519027, 1748.9717514124293, 1),
  (0.17950770219292914, 1733.231638418079, 1),
  (-1.45897070025529, 1740.4350282485875, 1),
  (0.1632198320712152, 1734.4632768361582, 1),
  (-0.5527536092613057, 1733.906779661017, 1),
  (0.9387527105081365, 1731.2429378531074, 1),
  (2.108896951098061, 1686.822033898305, 1),
  (-0.8114469302834943, 1691.7082152974504, 1),
  (-0.43565311363210757, 1672.584745762712, 1),
  (-0.6482782364935261, 1388.0934844192634, 1),
  (2.4835279229410183, 1415.3841807909605, 1),
  (-10.603477966492468, 1609.4943502824858, 1),
  (-2.146617228703112, 1456.1892655367233, 1),
  (-2.8710011257673203, 1286.276836158192, 1),
  (-1.0438818835764565, 1183.5282485875707, 1),
  (0.8540996355000346, 1142.7090395480227, 1),
  (-1.951528377537686, 1175.5593220338983, 1),
  (4.207569767769286, 1011.3378531073446, 1),
  (-3.76912318841775, 1029.0365439093487, 1),
 

In [13]:
"""
"""
def plot_feature(table, feature_dict, column_name):
    x = table["Matlab_now"]
    y1 = table[column_name]
    feature = feature_dict[column_name]
    y2 = feature[0] * x + feature[1]

    plt.plot(x, y1)
    plt.plot(x, y2)
    plt.xlabel("time(s)")
    plt.ylabel("Light intensity")
    plt.title(column_name)
    plt.show()

In [14]:
"""
"""
def plot_feature(table, feature_dict, column_name):
    x = table["Matlab_now"]
    y1 = table[column_name]
    feature = feature_dict[column_name]
    y2 = feature[0] * x + feature[1]

    fig = plt.figure()
    ax = ax = fig.add_subplot(111)
    ax.plot(x, y1)
    ax.plot(x, y2)
    ax.set_xlabel("time(s)")
    ax.set_ylabel("Light intensity")
    ax.set_title(column_name)
    print(get_axis_limits(ax))
    #ax.annotate("text", get_axis_limits(ax))
    plt.show()

In [16]:
def get_axis_limits(ax, scale=0.9):
    return ax.get_xlim()[1]*scale, ax.get_ylim()[1]*scale

In [17]:
def plot_all_features(table, feature_dict):
    x = table["Matlab_now"]
    fig, ax = plt.subplots(nrows=4, ncols=4, figsize=(45, 45))

    count = 0
    for row in ax:
        for col in row:
            if (count == len(table.columns[1:].values)):
                break
            column_name = table.columns[1:].values[count]
    
            feature = feature_dict[column_name]
            
            y1 = table[column_name]
            y2 = feature[0] * x + feature[1]
            #y3 = [feature[2] for _ in range(len(y1))]
            

            col.plot(x, y1)
            col.plot(x, y2)
            col.set_xlabel("time(s)")
            col.set_ylabel("Light intensity")
            col.title.set_text(column_name)
            #my_text1 = "y = %f*x + %f" % (feature[0], feature[1])
            #my_text2 = "mean = %f" % (feature[2])
            my_text1 = "y = {:9.4f}*x + {:9.4f}".format(feature[0], feature[1])
            my_text2 = "mean = {:9.4f}".format(feature[2])
            col.text(0.3, 0.9, my_text1 , transform=col.transAxes, size=20, weight='bold')
            col.text(0.3, 0.8, my_text2 , transform=col.transAxes, size=20, weight='bold')

            count += 1
    #return ax.get_xlim()[1]*scale, ax.get_ylim()[1]*scale
    #ax1.annotate('A', xy=get_axis_limits(ax1))
    plt.savefig("fig1.png")
    plt.show()


In [None]:
plot_all_features(tables[0], z)