In [1]:
%matplotlib
#%matplotlib inline
import os
import csv
import fnmatch
import numpy as np
import datetime
import re 
import pandas as pd
import matplotlib.pyplot as plt
import math

pd.options.mode.use_inf_as_na = True


Using matplotlib backend: Qt5Agg


In [2]:
KeyInclude = ['IncreaseTimeDwell', 'DecreaseTimeDwell']
TimeDwellOrig = 800
TimeFixation = 300

In [3]:
def ComputeDwellTime(userKeys):
    #print(userKeys)
    TimeDwellChanges = [key for key in userKeys if key[1] in KeyInclude]
    #print(TimeDwellChanges)
    
    if TimeDwellChanges:
        for pctChange in TimeDwellChanges:
            if pctChange[2] == 1:
                if pctChange[1] == 'IncreaseDwellTime':
                    TimeDwellNew = TimeDwellNew + 100
                else:
                    TimeDwellNew = TimeDwellNew - 100
    else:
        TimeDwellNew = TimeDwellOrig
    #print(TimeDwellNew)   
    return TimeDwellNew

In [4]:
# Create list of list (made of epochs) composed of times when user looked at scratchPad

def UserLookedAtScratchPad(keysLookedAt):
    epoch = list()   
    epochList = list()
    
    scratchPadKeyTime = [key for key in keysLookedAt if 'ScratchPad' in key[1]]
    scratchPadInd = [keyInd[0] for keyInd in enumerate(keysLookedAt) if 'ScratchPad' in keysLookedAt[keyInd[0]][1]]
    scratchPadIndArray = np.diff(np.asarray(scratchPadInd))
    
    # TODO: reduce the list to only the first and the last elements of the epoch
    
    # Find unique looks at scratchpad, by checking if progress pct is greater than previous one
    
    for keyInd in enumerate(scratchPadKeyTime):
        if keyInd[0] == 0:
            # first element
            epoch.append(scratchPadKeyTime[keyInd[0]])
            continue
        
        progressPctPrevious = scratchPadKeyTime[keyInd[0]-1][2]
        progressPctNow = scratchPadKeyTime[keyInd[0]][2]
        
        #print(progressPctPrevious, progressPctNow)
        if progressPctPrevious > progressPctNow:
            # new fixation has started
            epochList.append(epoch[0])
            epoch = list()
        
        epoch.append(scratchPadKeyTime[keyInd[0]])
            
    #print(epochList)

    return epochList

In [5]:
# function to convert list of date and time into datetime format list

def timeConversion(timeStrList):
    timeList = list()
    for time in timeStrList:
        time1, t1, t2 = time.partition('+')
        timeList.append(datetime.datetime.strptime(re.sub('[:.T]','-',time1[:-1]), "%Y-%m-%d-%H-%M-%S-%f"))
    return timeList

In [6]:
# This function will return the datetime in items which is the closest to the date pivot
def nearestTimePoint(dates, date):
    
    for d in dates:
        if d < date:
            nearestTP = d
        else:
            continue
    try: 
        nearestTP
        nearestTPind = dates.index(nearestTP)
    except:
        nearestTP = 0
        nearestTPind = -1
        
    return nearestTP, nearestTPind

In [7]:
# function to remove all invalid data in gazelog
def cleanGazeLog(gazeLog):
    gazeLogNew = list()
    for rowInd in enumerate(gazeLog):
        if 'Invalid' not in gazeLog[rowInd[0]]:
            gazeLogNew.append(gazeLog[rowInd[0]])
    return gazeLogNew

In [8]:
def hampel(vals_orig, k, sd):
    '''
    vals: pandas series of values from which to remove outliers
    k: size of window (including the sample; 7 is equal to 3 on either side of value)
    '''
    # Obtained from: https://stackoverflow.com/questions/46819260/filtering-outliers-how-to-make-median-based-
    # hampel-function-faster
    
    #Make copy so original not edited
    vals = pd.DataFrame(vals_orig)  
    #vals = vals.replace([np.inf, -np.inf], np.nan).fillna(vals.rolling(window=k, min_periods=1, center=True).median())
    vals = vals.replace([np.inf, -np.inf], np.nan).fillna(method='ffill')
    L= 1.4826
    rolling_median = vals.rolling(window=k, min_periods=1, center=True).median()
    difference = np.abs(rolling_median-vals)
    median_abs_deviation = difference.rolling(k).median()
    threshold = sd * L * median_abs_deviation
    outlier_idx = difference>threshold
    vals[outlier_idx] = rolling_median[outlier_idx]
    #print(vals)
    #valsWoNan = vals.replace([np.inf, -np.inf], np.nan).dropna()
    return(vals)

In [9]:
def plot5sForUniqueScratchPadFixation(scratchPadLookedAtEpoch, scratchPadList, phraseList, GazeLog, TimeDwell, subjName):
    
    flagFirst = 0
    rowTimeList = -1
    phraseScratchedList = list()
    phraseToBeScratchedList = list()
    timeUserLooked = list()
    timeScratchAll = list()
    timeToBeScratchedAll = list()
   
    phraseScratchedInd = 0
    phraseToBeScratchedInd = 0
    Veracity = list()
    
    timeStrUserLooked =  [item[0] for item in scratchPadLookedAtEpoch]
    timeStrScratch = [item1[0] for item1 in scratchPadList]
    timeStrToBeScracthed = [item2[0] for item2 in phraseList]
    timeStrGazeLog = [item3[0] for item3 in GazeLog]
    
    timeUserLooked = timeConversion(timeStrUserLooked)
    timeScratchAll = timeConversion(timeStrScratch)
    timeToBeScratchedAll = timeConversion(timeStrToBeScracthed)
    timeGazeLog = timeConversion(timeStrGazeLog)
        
        
    # Create list of pupil sizes from gazelog
    pupilLogL = [float(item4[29]) if 'Invalid' not in item4 else np.nan for item4 in GazeLog]
    pupilLogR = [float(item5[31]) if 'Invalid' not in item5 else np.nan for item5 in GazeLog]

    # Pupil distance (x,y,z) in User Coordinate system 
    pupilDistLx = [float(item6[3]) if 'Invalid' not in item6 else np.nan for item6 in GazeLog]
    pupilDistLy = [float(item7[4]) if 'Invalid' not in item7 else np.nan for item7 in GazeLog]
    pupilDistLz = [float(item8[5]) if 'Invalid' not in item8 else np.nan for item8 in GazeLog]
    pupilDistRx = [float(item9[10]) if 'Invalid' not in item9 else np.nan for item9 in GazeLog]
    pupilDistRy = [float(item10[11]) if 'Invalid' not in item10 else np.nan for item10 in GazeLog]
    pupilDistRz = [float(item11[12]) if 'Invalid' not in item11 else np.nan for item11 in GazeLog]
    
    pupilDist = [math.sqrt((pupilDistLx[indPt]-pupilDistRx[indPt])**2 + (pupilDistLy[indPt]-pupilDistRy[indPt])**2 + (pupilDistLz[indPt]-pupilDistRz[indPt])**2) for indPt in range(0, len(pupilDistLx))]
    
    
    
    fig = plt.figure()
    axL = fig.add_subplot(3, 1, 1)
    axR = fig.add_subplot(3, 1, 2)
    axDist = fig.add_subplot(3, 1, 3)
    
    for timeList in timeUserLooked:
        rowTimeList = rowTimeList + 1
        # Extract epoch data from gazelog
        
        timeScratchPadActive = timeList - datetime.timedelta(milliseconds=float(scratchPadLookedAtEpoch[rowTimeList][2])*TimeDwell) - datetime.timedelta(milliseconds=TimeFixation)
        timeGazeLogStart, GazeLogStartInd = nearestTimePoint(timeGazeLog, timeScratchPadActive)
        
        timeWindowEnd = timeList + datetime.timedelta(seconds=5)
        
        timeGazeLogEnd, GazeLogEndInd = nearestTimePoint(timeGazeLog, timeWindowEnd)
        #print(GazeLogStartInd, GazeLogEndInd)
        
        if GazeLogStartInd != GazeLogEndInd:
            gazeEpochL = pupilLogL[GazeLogStartInd:GazeLogEndInd]
            gazeEpochR = pupilLogR[GazeLogStartInd:GazeLogEndInd]
            interPupilDist = pupilDist[GazeLogStartInd:GazeLogEndInd]
            
            # Filter pupil sizes
            pupilWoOutlierL = hampel(gazeEpochL, 9, 3)
            pupilWoOutlierR = hampel(gazeEpochR, 9, 3)
            
            #print(pupilWoOutlierR)
            
            # Moving Mean of data without outliers: 
            pupilMeanL = pupilWoOutlierL.rolling(window=9, min_periods=1, center=True).mean()
            pupilMeanR = pupilWoOutlierR.rolling(window=9, min_periods=1, center=True).mean()
            
            # After the filtering is done: CHANGE THIS
            pupilSizeL = pupilWoOutlierL.mean(numeric_only=float)[0]
            pupilSizeR = pupilWoOutlierR.mean(numeric_only=float)[0] 
            
            pupilPlotL = (pupilMeanL - pupilMeanL[0][0])/pupilMeanL[0][0]
            pupilPlotR = (pupilMeanR - pupilMeanR[0][0])/pupilMeanR[0][0]
            
            
            
            #pupilL = pupilL + pupilPlotL
            #pupilR = pupilR + pupilPlotR
            
        else:
            print('one')
            continue
            #continue # remove samples with only 1 sample
        
        #print(pupilPlotL)
        # Also to check if phrase typed is correct or not, only check the last element of epoch
        #print(timeList[-1])

        
        timeOfPhraseScratchedNow, phraseScratchedInd = nearestTimePoint(timeScratchAll, timeList)
        timeOfPhraseToBeScratchedNow, phraseToBeScratchedInd = nearestTimePoint(timeToBeScratchedAll, timeList)
        
        if phraseScratchedInd < 0:
            phraseScratched = ''
        else:
            phraseScratched = scratchPadList[phraseScratchedInd][1]
        
        phraseToBeScratched = phraseList[phraseToBeScratchedInd][1]
        
        #print(phraseScratched, phraseToBeScratched)
        
        # are they the same? yes/no -> save in list with the time
        if phraseScratched in phraseToBeScratched:
            if 'THE EXPERIMENT IS NOW DONE' in phraseToBeScratched:
                #print('exp done')
                continue
            Veracity.append([pupilSizeL, pupilSizeR, 'C'])
            x = np.arange(0, float(len(pupilPlotL)/90), float(1/90))
            if flagFirst == 0:
                axL.plot(x, pupilPlotL, label = str(rowTimeList)+' C')
                axR.plot(x, pupilPlotR, label = str(rowTimeList)+' C')
                flagFirst = 1
            else:
                axL.plot(x, pupilPlotL, '--', label = str(rowTimeList)+' C')
                axR.plot(x, pupilPlotR, '--', label = str(rowTimeList)+' C')
                
            # Plot inter pupil distance
            axDist.plot(x, interPupilDist, '--', label = str(rowTimeList)+ ' C')
        else:
            Veracity.append([pupilSizeL, pupilSizeR, 'INC'])
            x = np.arange(0, float(len(pupilPlotL)/90), float(1/90))
            if flagFirst == 0:
                axL.plot(x, pupilPlotL, label = str(rowTimeList)+' INC')
                axR.plot(x, pupilPlotR, label = str(rowTimeList)+' INC')
                flagFirst = 1
            else:
                axL.plot(x, pupilPlotL, ':', label = str(rowTimeList)+' INC')
                axR.plot(x, pupilPlotR, ':', label = str(rowTimeList)+' INC')
                
            # Plot inter pupil distance
            axDist.plot(x, interPupilDist, ':', label = str(rowTimeList)+ ' INC')
            
    
    axL.set_title('%s : Left' %subjName)
    axL.set_xlabel('Time [in s]')
    axL.set_ylabel('Relative Pupil dilation [a.u.]')
    axL.legend()
    axR.set_title('%s : Right' %subjName)
    axR.set_xlabel('Time [in s]')
    axR.set_ylabel('Relative Pupil dilation [a.u.]')
    plt.tight_layout()      
    axR.legend()
    return Veracity

In [10]:
subjName = r'C:\DTU\Data\201805_HealthnRehab\TypingData'
j = 0
for root, dirs, subfolder in os.walk(subjName):
    LetterLookedAtList = list()
    LetterLookedAt = list()
    
    if not dirs:
        
        if 'notCompleted' in root or 'notInclude' in root: # Some subjects do not have gaze log and have been marked as 
            #notInclude
            continue
        if 'tb' in root or 'joha' in root or 'ae' in root:
            continue
            
        userKeys = None
        scratchPad = None
        gazeLog = None
        stimPhrase = None
        
        for file in subfolder:
            
            if fnmatch.fnmatch(file, 'user_look*'):
                try:
                    
                    fUserKey = open(root + '\\' + file, encoding='utf-8')
                    readerUserKey = csv.reader(fUserKey)
                    userKeys = list(readerUserKey)
                    userKeys.remove(userKeys[0])
                except:
                    if fUserKey is not None:
                        fUserKey.close()
                    else:
                        print('error in opening the user looks at log file')
            elif fnmatch.fnmatch(file, 'ScratchPad*'):
                try:
                    fScratchPad = open(root + '\\' + file, encoding='utf-8')
                    readerScratchPad = csv.reader(fScratchPad)
                    scratchPad = list(readerScratchPad)  
                    scratchPad.remove(scratchPad[0])
                except:
                    if fScratchPad is not None:
                        fScratchPad.close()
                    else:
                        print('error in opening the user looks at log file')
            elif fnmatch.fnmatch(file, 'PhraseLog*'):
                try:
                    fStimPhrase = open(root + '\\' + file, encoding='utf-8')
                    readerStimPhrase = csv.reader(fStimPhrase)
                    stimPhrase = list(readerStimPhrase)
                    stimPhrase.remove(stimPhrase[0])
                except:
                    if fStimPhrase is not None:
                        fStimPhrase.close()
                    else:
                        print('error in opening the phrase log file')
            elif fnmatch.fnmatch(file, 'GazeLog*'):
                try:
                    fGazeLog = open(root + '\\' + file, encoding='utf-8')
                    readerGazeLog = csv.reader(fGazeLog)
                    gazeLog = list(readerGazeLog)
                    gazeLog.remove(gazeLog[0]) # would not matter much even if the first row was not labels
                    gazeLog.remove(gazeLog[-1])
                except:
                    if fGazeLog is not None:
                        fGazeLog.close()
                    else:
                        print('error in opening the gaze log file')
            else:
                continue
            
                # if all these lists exist
            if userKeys is None or scratchPad is None or stimPhrase is None or gazeLog is None:
                continue
            else:
                # Compute dwell time
                TimeDwell = ComputeDwellTime(userKeys)

                # call function to check when scratchpad is looked at and save it in a list
                scratchPadKeyTime = UserLookedAtScratchPad(userKeys)

                # clean gaze log and remove rows with any invalid data -- not clean here but later, and replace nan values with 
                # median values in epoch
                #gazeLogValid = cleanGazeLog(gazeLog)
                # gazeLogValid = gazeLog
                
                # for every element, find the time closest and previous to it, and check what was typed 
                # AND what should have been typed
                # Also, add gaze data to epoch, but first subtract the fixation time and complete the pupil data filtering as
                # per Per's paper
                
                a = re.compile('(?<=TypingData)(.*)(?=OptiKey)')
                subjName = a.findall(root)[0]
                print(subjName)
                WrittenTruthListAll = plot5sForUniqueScratchPadFixation(scratchPadKeyTime, scratchPad, stimPhrase, gazeLog, TimeDwell, subjName)
                #print('done')
                break


\May15\akt_MS\
\May15\jl_DT\
\May15\KEA_MS\
\May15\lone_DT\
\May15\mcc_MS\
\May15\MK_DT\
\May15\ok_MS\
\May15\pt_DT\
\May15\sc_MS\
\May15\sh_MS\
\May16\hc_MS\
\May16\ib_MS\
\May16\km_DT\
\May16\ma_DT\
\May16\pgba_DT\
\May16\smn_DT\
\May17\eo_DT\
\May17\jek_MS\
\May17\jg_DT\
\May17\lg_MS\
\May17\lr_MS\




\May17\mm_MS\
\May17\snk_DT\
