* [Go to input parameters](#inputParameters)

For more details about meanings of ids go to [this link](https://docs.google.com/spreadsheets/d/1uZLtoB3OijjfMOUJHBkoL542wfa4eLv_JgjrT5e1oGI/edit#gid=577660682)

In [28]:
import pandas as pd
import numpy as np
import os 
from os import listdir, remove
from os.path import isfile, join, exists


# -------------------------------------------------------

def ls(folder_path):
    """
    ls() list all the files in the input folder
    """
    files = [f for f in listdir(folder_path) if isfile(join(folder_path, f))]
    files.sort()
    return files


# -------------------------------------------------------

def saveFile(path, data, labels = '', fmt = '%.18e'):
    '''
    function to save data into either new file or appending to file.
    path: path to file
    data: either np.ndarray or pd.dataframe
    labels: use when data is numpy. When data is dataframe, labels are the names from input data
    fmt: use only when data is numpy
    '''
    if type(data) is np.ndarray:
        if not os.path.isfile( path ):
            # create new file        
            np.savetxt( path , data , delimiter = ';', header = labels, comments = '', fmt = fmt)
        else:
            # append to file
            with open(path, "a") as f:
                np.savetxt(f , data , delimiter = ';', comments = '', fmt = fmt )
    elif isinstance(data, pd.DataFrame):
        if not os.path.isfile( path ):
            # create new file        
            data.to_csv(path, na_rep = 'None', sep = ';', index = False)
        else:
            # append to file
            data.to_csv(path, na_rep = 'None', sep = ';', index = False, mode='a', header=False)

## General Data:
it takes out messages with id=0, i.e. the general data as phone type, starting time, etc.

In [29]:
def compute_general_data():
    df[ df['id_msg'] == 0 ]

## Nasa data:
it takes out data from the nasa tests

In [30]:
def compute_nasa():
    output_file = 'nasa_stats.csv'

    nasa = df[ df['id_msg']==32 ]

    if len( nasa ) < 18:

        print('Error in nasa. User did not finisht the test!')
    
    elif len( nasa ) > 18:

        print('Error in nasa. User did the test more than one time!')

    else:

        label_cols = 'type_test; id_user; level; Mental; Physic; Temporal; Performance; Effort; Frustration; Total'

        # low level
        level = 0
        # reshape data
        nasa_low = nasa[0:6].pivot(index = 'id_msg', columns="value", values="value_extra").to_numpy(dtype = float)
        # removing physical demand to compute and add total nasa
        nasa_low = np.append ( nasa_low, [[ np.sum( nasa_low[0, [0, 2, 3, 4, 5]] ) ]] , axis = 1)
        # truncate decimals
        nasa_low = np.around(nasa_low, 3)
        # add type-test, id-user and level
        nasa_low = np.append ( [[type_test, id_user, level]] , nasa_low, axis = 1)
        # saving file
        saveFile( output_folder + output_file ,nasa_low , label_cols , fmt='%s' )

        # med level
        level = 1
        nasa_med = nasa[6:12].pivot(index = 'id_msg', columns="value", values="value_extra").to_numpy(dtype = float)
        nasa_med = np.append ( nasa_med, [[ np.sum( nasa_med[0, [0, 2, 3, 4, 5]] ) ]] , axis = 1)
        nasa_med = np.around(nasa_med, 3)
        nasa_med = np.append ( [[type_test, id_user, level]] , nasa_med, axis = 1)
        saveFile( output_folder + output_file ,nasa_med , label_cols , fmt='%s' )

        # high level
        level = 2
        nasa_high = nasa[12:18].pivot(index = 'id_msg', columns="value", values="value_extra").to_numpy(dtype = float)
        nasa_high = np.append ( nasa_high, [[ np.sum( nasa_high[0, [0, 2, 3, 4, 5]] ) ]] , axis = 1)
        nasa_high = np.around(nasa_high, 3)
        nasa_high = np.append ( [[type_test, id_user, level]] , nasa_high, axis = 1)
        saveFile( output_folder + output_file , nasa_high , label_cols ,fmt='%s' )

## Stimuli perception:
it takes out users' answers from stimuli perception such as audio and vibration

In [31]:
def compute_stimuli_perpception():
    output_file = 'stimuli_perception.csv'

    stimuli_A = df[ df['id_msg'] == 33 ]
    stimuli_H = df[ df['id_msg'] == 34 ]
    volume = df[ df['msg'] == ' volume (%) ' ]['value'].values[0]

    if len(stimuli_A)!=1 or len(stimuli_H)!=1:
        print('Error in stimuli')
    else:
        # concatenate values
        stimuli = np.reshape( [ stimuli_A['value'].to_numpy(dtype = str) , stimuli_H['value'].to_numpy(dtype=str) ] , (1, 2) )
        # add type-test, id-user and volume
        stimuli = np.append ( [[type_test, id_user, volume]] , stimuli, axis = 1)
        # saving file
        label_cols = 'type_test; id_user; volume; Auditory; Haptic'
        saveFile( output_folder + output_file , stimuli , label_cols , fmt="%s" ) 

## Gaming score:
it analyses data from balls and patterns

In [32]:
def analyze_ball_data(df_level, level, nBallsInLoop):        
    ball_moves = df_level[ df_level['id_msg'].isin([10,11,24]) ] # dataframe including movement balls by system (10) and by user (11)
        
    test = []
    user = []
    level_task_1 = []
    system_ball_task_1 = []
    success_task_1 = []
    time_task_1 = []
    reaction_time_task_1 = []
    kBallsInLoop = nBallsInLoop
    nValidBalls = 0
    first_ball_pattern = True # flag indicating this is the first ball in a pattern loop
    
    for idx, ball in ball_moves.iterrows():
        type_move = ball['id_msg']
        if type_move == 24:
            # new pattern loop
            kBallsInLoop = nBallsInLoop
            first_ball_pattern = True
        else: 
            ball_time = ball['time']
            value     = int( ball['value'] )
            #
            if type_move == 10: # new ball position by system
                                                                                   
                if not first_ball_pattern: # if it is not the first system ball, it is allowed to save
                    # if expected_moves != 0:
                    if kBallsInLoop < 0:
                        # here means valid system balls were registered. Sometimes more balls can appear because delays in the app. But they are not valid
                        continue
                    #
                    # OUTPUT VALUES:
                    #
                    nValidBalls += 1
                    test.append( type_test )
                    user.append( id_user )
                    level_task_1.append( level )                    
                    system_ball_task_1.append( expected_moves )
                    # if nBallsInLoop-kBallsInLoop == 1:
                    #    print('>>{} : {} : {}'.format( nBallsInLoop-kBallsInLoop, start_time , expected_moves )) # todo: remove line
                    # else: 
                    #     print('>{} : {} : {}'.format( nBallsInLoop-kBallsInLoop, start_time , expected_moves )) # todo: remove line
                    #
                    if num_moves - abs(expected_moves) == 0 and user_position == 0: # success
                        # success_task_1[k_ball] = True
                        success_task_1.append( True )
                        time_task_1.append( user_time - start_time )
                    else: # not success
                        success_task_1.append( False )
                        time_task_1.append( None )
                    if num_moves > 0:
                        reaction_time_task_1.append( reaction_time )
                    else: # user never moved the ball
                        reaction_time_task_1.append( None )
                #
                # set values of variables to start analysis for current system ball position
                first_ball_pattern = False
                expected_moves = value
                start_time =  ball_time
                num_moves  = 0
                user_time  = 0
                first_user_move = True
                #
                kBallsInLoop -= 1

            elif type_move == 11: # new ball position by user
                num_moves += 1 # counting the user movements after each time system moved the ball
                user_time = ball_time
                user_position = value
                if first_user_move:            
                    first_user_move = False
                    reaction_time = user_time - start_time
    #
    dataset = pd.DataFrame( {'type_test': test,
                             'id_user'  : id_user,
                             'level'    : level_task_1, 
                             'system_ball': system_ball_task_1,
                             'success'  : success_task_1,
                             'time_ms'  : time_task_1,
                             'reaction_time_ms' : reaction_time_task_1
                            }
                          )
    
    return dataset, nValidBalls

# -------------------------------------------------------

def analyze_pattern_data(df_level, level):
    patterns_data = df_level.loc[ df_level['id_msg'].isin([20,21,23,24]) ]
    # pattern by system
    expected_pattern    = patterns_data[ patterns_data['id_msg'] == 20 ]['value_extra']
    expected_pattern_id = patterns_data[ patterns_data['id_msg'] == 20 ]['value']
    # pattern by user
    answered_pattern = patterns_data[ patterns_data['id_msg'] == 24 ]['value_extra']
    # level
    n_patterns = len( answered_pattern )
    level_task_2 = level * np.ones( n_patterns, dtype = np.int32 )
    # type-test
    test = [type_test]*n_patterns
    # id user
    user = id_user * np.ones( n_patterns, dtype = np.int32 )
    # user time
    start_time = patterns_data[ patterns_data['id_msg'] == 21 ]['time'].astype(np.float)
    end_time   = patterns_data[ patterns_data['id_msg'] == 23 ]['time'].astype(np.float)
    if len(start_time) != len(end_time):   
        # when user does not makes OK in answer-patterns
        #
        end_time_syst = patterns_data[ patterns_data['id_msg'] == 24 ]['time'].astype(np.float)
        #
        if len(end_time) == 0:
            # when user never does OK and system automatically goes to next pattern
            total_time = end_time_syst.values - start_time.values
        else:
            # when user does not OK by some answer patterns
            total_time = np.zeros( n_patterns )
            ix_start = 0
            ix_end = 0
            for tm in end_time_syst:
                if not ( min( np.abs( tm - end_time ) ) < 2000 ):
                    # print("no user answer at pattern: {}".format(ix_start))
                    total_time[ix_start] = tm - start_time.values[ix_start]
                    ix_start += 1        
                else:
                    total_time[ix_start] = end_time.values[ix_end] - start_time.values[ix_start]
                    ix_start += 1
                    ix_end   += 1
    else:
        # when user does OK by all answer patterns
        total_time = end_time.values - start_time.values
    #
    success = np.zeros( n_patterns, dtype=bool )
    for k in range( n_patterns ):
        success[k] = answered_pattern.iloc[k] == expected_pattern.iloc[k]
        if success[k] == False:
            total_time[k] = np.nan # None
    #
    dataset = pd.DataFrame( {'type_test' : test,
                             'id_user'   : user,
                             'level'     : level_task_2,                              
                             'success'   : success,
                             'total_time_ms': total_time,
                             'expected_pattern_id': expected_pattern_id.values,
                             'expected_pattern'   : expected_pattern.values,                         
                             'answered_pattern'   : answered_pattern.values,
                            }
                          )
    return dataset

In [33]:
def compute_gaming_score():
    output_file_ball = 'ball_data.csv'
    output_file_pattern = 'pattern_data.csv'

    start_level = df[ df['id_msg'] == 1 ].astype({'value': 'int32'})
    end_level   = df[ df['id_msg'] == 2 ].astype({'value': 'int32'})
    # number of ball by each pattern loop
    nBallsInLoop = 8
    expectedBalls = (nBallsInLoop*6 , nBallsInLoop*8 , nBallsInLoop*9)
    errors = 0 # count number of errors
    if len(start_level)!=3 or len(end_level)!=3:
        print('Error. user did the game more than one time or exit before ending')
    else:
        success_task_1 = np.zeros(3)
        total_task_1 = np.zeros(3)
        df_ball_results = [None] * 3
        df_patterns_results = [None] * 3
        for level in range(3):
            # slicing data by level
            s = (start_level[ start_level['value'] == level] ).index.values
            e = (end_level[ end_level['value'] == level] ).index.values
            df_level = df[s[0]:e[0]+1]
            # get_valid_ball_data removes balls in zero and system movements of the ball small than 3 secs
            df_ball_results[level] , numValidBalls = analyze_ball_data(df_level, level, nBallsInLoop)
            # analyse patterns data
            df_patterns_results[level] = analyze_pattern_data(df_level, level)
            #
            if numValidBalls == expectedBalls[level]:
                print('level: {} - {}balls : OK'.format(level , numValidBalls))            
            else:
                print('level: {} - {}balls : ERROR'.format(level , numValidBalls))
                errors += 1
        #    
        # appending results to one dataframe
        df_ball_results = df_ball_results[0].append( df_ball_results[1] ).append( df_ball_results[2] )
        df_patterns_results = df_patterns_results[0].append( df_patterns_results[1] ).append( df_patterns_results[2] )

        # save results
        saveFile(output_folder + output_file_ball, df_ball_results)
        saveFile(output_folder + output_file_pattern, df_patterns_results)
    #
    return errors

### Input parameters: <a class="anchor" id="inputParameters"></a>
Name format for input files is: log_##.txt, where ## hast to be an integer number

In [34]:
# ------------------------- INPUT PARAMETERS ------------------------------------

# run analysis
input_foldername = './../../../Data/Data_cogito/Raw_data/initial_tests/new/'
output_folder    = './../../../Data/Data_cogito/Processed_data/'
input_files = ['all'] # ['Log_02.txt'] , ['all']

# -------------------------------------------------------------------------------


if input_files[0] == 'all':
    files = ls(input_foldername)
else:
    files = input_files
#
nUsers = 0 # number of registered users
nGames = 0 # number of registered games
nErrors = 0
for file in files:
    print(file)
    id_user = int(file.split('.')[0].split('_')[1])
    col_names = [ 'time',    'id_msg',    'msg',    'value',    'value_extra' ]
    df = pd.read_csv(input_foldername + file , header = None, sep = '_', na_values = 'NA', names = col_names)
    # type test
    type_test = df[ df['msg'] == ' test version ' ]['value'].values[0]

    # number of registered games
    nTimesPlayed = len( df[ (df['id_msg']==2) & (df['value']==' 0') ] )
    nUsers += 1
    nGames += nTimesPlayed
    if nTimesPlayed == 1:
        print( 'nGames = {} : OK'.format(nTimesPlayed) )
    else:
        print( 'nGames = {} : ERROR'.format(nTimesPlayed) )
        nErrors += 1
    #
    # compute_general_data
    compute_stimuli_perpception()
    compute_nasa()
    nErrors += compute_gaming_score()
    print('')

if (nErrors != 0):
    print('ERROR! see above for details')
else:
    print("Everything seems to be: OK")
print('ready')

Log_010.txt
nGames = 1 : OK
level: 0 - 48balls : OK
level: 1 - 64balls : OK
level: 2 - 72balls : OK

Log_011.txt
nGames = 1 : OK
level: 0 - 48balls : OK
level: 1 - 64balls : OK
level: 2 - 72balls : OK

Log_012.txt
nGames = 1 : OK
level: 0 - 48balls : OK
level: 1 - 64balls : OK
level: 2 - 72balls : OK

Log_013.txt
nGames = 1 : OK
level: 0 - 48balls : OK
level: 1 - 64balls : OK
level: 2 - 72balls : OK

Log_014.txt
nGames = 1 : OK
level: 0 - 48balls : OK
level: 1 - 64balls : OK
level: 2 - 72balls : OK

Everything seems to be: OK
ready


**POST ANALYSIS:**

Copy these results to the excel file to the corresponding table

In [35]:
in_file = 'ball_data.csv'
in_folder = './../../../Data/Data_cogito/Processed_data/'

df = pd.read_csv(in_folder+in_file, sep = ';')
nB = len( df[df['type_test'] == ' Base']['id_user'].unique() )
nH = len( df[df['type_test'] == ' Haptic']['id_user'].unique() )
nA = len( df[df['type_test'] == ' Auditory']['id_user'].unique() )
nHA = len( df[df['type_test'] == ' HapticAuditory']['id_user'].unique() )

print('Base users: \t{}'.format(nB))
print('Haptic users: \t{}'.format(nH))
print('Auditory users: {}'.format(nA))
print('HapticAuditory users: {}'.format(nHA))
print('\nTotal users: \t{}'.format( nHA + nH + nA + nB ) )

Base users: 	1
Haptic users: 	1
Auditory users: 1
HapticAuditory users: 2

Total users: 	5
