In [1]:
# Imports / global contants

# csv Dateien sind im Verzeichnis ../data zu finden

import pandas as pd
import glob
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
import seaborn as sns
from scipy.signal import find_peaks, find_peaks_cwt, savgol_filter
import plotly.graph_objects as go

timeFormat = "%Y-%m-%dT%H:%M:%S.%fZ"

export_data = "../export/data/"
export_img = "../export/img/complete/"
export_img_single = "../export/img/single/"
export_img_questionnaire = "../export/img/questionnaire/"

export_data_spss = "../export/spss/"

path = "../data/"

path_questionnaires = "../questionnaires/"

condition_names = ['No Feedback', 'Tactile Feedback', 'Visual Feedback', 'Combined Feedback']

sns.set_theme()

# Utility function: save data

In [4]:
def saveData(data_complete):
    data_complete.to_csv(rf'../export/data/data_all.csv', sep=";", index=False)

    data_experiment_only = data_complete[data_complete['Block'] >= 0]

    data_experiment_only.to_csv(rf'../export/data/data_experiment.csv', sep=";", index=False)

def saveResults(results_complete):
    results_complete.to_csv(rf'../export/data/results_all.csv', sep=";", index=False)

    results_valid = results_complete[results_complete['ProbandId'] != 3]
    
    results_valid.to_csv(rf'../export/data/results_valid_all.csv', sep=";", index=False)

    results_experiment_only = results_complete[results_complete['BlockId'] >= 0]
    results_valid_experiment_only = results_valid[results_valid['BlockId'] >= 0]

    results_experiment_only.to_csv(rf'../export/data/results_experiment.csv', sep=";", index=False)
    results_valid_experiment_only.to_csv(rf'../export/data/results_experiment_valid.csv', sep=";", index=False)

# utility functions: load/save peaks

In [5]:
def loadPeaks(recoverArrayColumns = []):
    result = pd.read_csv(rf'{export_data}all_peaks.csv', sep= ";")

    # when importing: convert back to pandas arrays
    for col in recoverArrayColumns:
        result[col] = result[col].apply(lambda item: np.fromstring(item.replace('[','').replace(']',''), dtype=float, sep="|"))
   
    return result

def savePeaks(result_all, writeArrayColumns = []):
    # need to extra format floating point lists as pandas serialization does not like them
    for col in writeArrayColumns:
        result_all[col] = result_all[col].apply(lambda item: np.array2string(item, separator="|"))
    
    result_all.to_csv(rf'{export_data}all_peaks.csv', sep= ";")

In [None]:
def getLastLayerChange(index):
    data = data_complete[(data_complete['LayerChange'] != 0) & (data_complete.index < index)].tail(1)

    date = data['Date'].values[0]
    idx = data.index.values[0]
    
    return (date, idx)

In [2]:
def add_margin(ax,x=0.05,y=0.05):
    # This will, by default, add 5% to the x and y margins. You 
    # can customise this using the x and y arguments when you call it.

    xlim = ax.get_xlim()
    ylim = ax.get_ylim()

    xmargin = (xlim[1]-xlim[0])*x
    ymargin = (ylim[1]-ylim[0])*y

    ax.set_xlim(xlim[0]-xmargin,xlim[1]+xmargin)
    ax.set_ylim(ylim[0]-ymargin,ylim[1]+ymargin)

# Compute and Draw Statistics

* compute statistics for specified column, grouped by:
  * condition
  * block
* sorted by given condition order
* plots for 
  * column over trials for each condition (line plot)
  * condition in regard to block (line plot and regression plot)
  * descriptive statistics for each condition (box plot)
  * condition per block and vice versa (bar chart) 

In [4]:
def computeStatistics(data_complete, columnName = 'DurationMS', filePrefix='duration', title='Duration', label_y='Time (s)'):
    conditions = data_complete.groupby(['Condition'])

    # transform conditions into columns (count_condition as index to prevent NaN fillers)
    # reindex to sort columns according to defined order
    condition_duration = data_complete.pivot_table(columns=['Condition'], values=[columnName], index=['countCondition'])[columnName].reindex(columns=condition_names)

    display(condition_duration)

    condition_duration.to_csv(rf'{export_data}{filePrefix}_allTrials.csv', sep=";", index=False)

    # compute mean duration of a trial in regard to the block number
    # unstack to get conditions as columns and sort by given column order
    condition_duration_block = data_complete.groupby(['Condition', 'BlockId']).mean()[columnName].unstack(level=0).reindex(columns=condition_names)

    # stack again and unstack to pivot the whole table
    condition_block_duration = condition_duration_block.stack().unstack(level = 0)

    display(condition_block_duration)

    condition_block_duration.to_csv(rf'{export_data}{filePrefix}_trialsBlockCondition.csv', sep=";", index=False)

    # descriptive statistics for trial duration
    durations_desc = condition_duration.describe()

    display(durations_desc)

    # transform conditions into columns, but compute mean value for associated trial index to get "learning effect"
    # reindex to sort columns according to defined order
    durations = data_complete.pivot_table(index='TrialId', columns=['Condition'])[columnName].reindex(columns=condition_names)

    display(durations)

    durations.to_csv(rf'{export_data}{filePrefix}_trialsBlock.csv', sep=";", index=False)

    fig0, ax0 = plt.subplots(figsize=(25,8))

    condition_duration.plot(ax = ax0)

    ax0.set_xlabel('Trial')
    ax0.set_ylabel(label_y)

    plt.title(f'{title} of each trial by Condition')
    

    fig0.savefig(rf'{export_img}{filePrefix}_allTrials.png')
    fig0.savefig(rf'{export_img}{filePrefix}_allTrials.svg')
    
    plt.show()

    fig1, ax1 = plt.subplots(figsize=(25,8))

    durations.plot(ax = ax1)

    ax1.set_xlabel('Trial Number')
    ax1.set_ylabel(label_y)
    ax1.set_xticks(np.arange(0, 21, step=1))

    plt.title(f'{title} over the Block')

    fig1.savefig(rf'{export_img}{filePrefix}_trialsBlock-plot.png')
    fig1.savefig(rf'{export_img}{filePrefix}_trialsBlock-plot.svg')

    plt.show()

    sns.set_theme()
    sns.set(font_scale = 1.5)
    g = sns.lmplot(x='TrialId', y=columnName, hue='Condition', hue_order=condition_names, data = data_complete, height=8, aspect=2, x_estimator=np.mean)
    g.set(title=f'{title} over the Block')    
    g.set(xlabel='Trial Number')
    g.set(ylabel=label_y)
    g.set(xticks=np.arange(0, 21, step=1))

    for ax in plt.gcf().axes:
        add_margin(ax,x=0.05,y=0.01)

    g.savefig(rf'{export_img}{filePrefix}_trialsBlock-reg.png')
    g.savefig(rf'{export_img}{filePrefix}_trialsBlock-reg.svg')

    # for ax in plt.gcf().axes:
    #     l = ax.get_xlabel()
    #     ax.set_xlabel(l, fontsize=15)
    #     l = ax.get_ylabel()
    #     ax.set_ylabel(l, fontsize=15)

    plt.show()

    fig2, ax2 = plt.subplots(figsize=(9,9))

    condition_duration.boxplot(ax = ax2) 

    ax2.set_xlabel('Condition')
    ax2.set_ylabel(label_y)
    ax2.yaxis.grid(False)
    ax2.xaxis.grid(False)

    plt.title(f'Median {title} per Condition')

    fig2.savefig(rf'{export_img}{filePrefix}_trialsCondition-box.png')
    fig2.savefig(rf'{export_img}{filePrefix}_trialsCondition-box.svg')

    plt.show()

    fig3, ax3 = plt.subplots(figsize=(25,8))

    condition_duration_block.plot(ax = ax3, kind="bar")

    ax3.set_xlabel('Block')
    ax3.set_ylabel(label_y)

    plt.title(f'Median Trial {title} per Block and Condition')

    fig3.savefig(rf'{export_img}{filePrefix}_trialsBlockCondition-bar.png')
    fig3.savefig(rf'{export_img}{filePrefix}_trialsBlockCondition-bar.svg')

    plt.show()

    fig4, ax4 = plt.subplots(figsize=(25,8))

    condition_block_duration.plot(ax = ax4, kind="bar")

    ax4.set_xlabel('Condition')
    ax4.set_ylabel(label_y)

    plt.title(f'Median Trial {title} per Condition and Block')

    fig4.savefig(rf'{export_img}{filePrefix}_trialsConditionBlock-bar.png')
    fig4.savefig(rf'{export_img}{filePrefix}_trialsConditionBlock-bar.svg')

    plt.show()

# boxplot / descriptive stats

In [8]:
def generateBoxPlotStats(data, groupCol, indexColumns, statsColumn, xLabel, yLabel, title, filename, save= False, show = False, reindexColumns = False, outliers = False, ylim = []):
    grouped = data.groupby(groupCol)

    desc = grouped[statsColumn].describe().transpose()

    if reindexColumns:
        desc = desc.reindex(columns=condition_names)

    display(desc)

    desc_pivot = data.pivot_table(columns=groupCol, index=indexColumns)[statsColumn]

    if reindexColumns:
        desc_pivot = desc_pivot.reindex(columns=condition_names)

    display(desc_pivot.describe())

    fig1, ax1 = plt.subplots(figsize=(8,8))

    sns.boxplot(data = desc_pivot, ax = ax1, showfliers=outliers)

    ax1.set_xlabel(xLabel)
    ax1.set_ylabel(yLabel)
    ax1.yaxis.grid(False)
    ax1.xaxis.grid(False)
    if len(ylim) == 2:
        ax1.set_ylim(ylim[0], ylim[1])

    plt.title(title)

    if save:  
        fig1.savefig(rf'{export_img}{filename}-box.png')
        fig1.savefig(rf'{export_img}{filename}-box.svg')
    
    if show: 
        plt.show()

# compute z-diff for peaks 

* first, peaks are ordered by date
* peaks are grouped by trialnumber, first peak has diff = 0
* difference to peak before

In [2]:
def computeDifferenceZ(peaks):
    peaks = peaks.sort_values(['Date']).reset_index()
    peaks['Diff_Z'] = peaks['Peak_Z'].diff()
    peaks['Diff_Trial'] = peaks['Trial'].diff()

    peaks.loc[0, 'Diff_Trial'] = 1

    peaks.loc[peaks['Diff_Trial'] != 0, 'Diff_Z'] = 0

    return peaks