In [10]:
import pandas as pd
import numpy as np
import ast

In [11]:
def get_stopping_data(df, split_by_block_num=False):
    """
    Extracts and calculates metrics related to 'stop' and 'go' conditions for test trials.
    
    The function processes data to compute key metrics, including accuracy, response time (rt),
    stop signal delay (SSD), and omission rates, for both 'stop' and 'go' trial conditions.
    The results can be grouped by block numbers if required.
    
    Input:
      df: DataFrame containing task data for a specific task for a single subject.
      split_by_block_num (optional): Boolean flag to determine if results should be grouped 
      by block number (default is False).
      
    Output:
      Prints the computed metrics either grouped by block numbers or in an aggregated form.
      
    Metrics Calculated:
      - stop_acc: Mean accuracy for 'stop' trials.
      - go_acc: Mean accuracy for 'go' trials.
      - avg_go_rt: Average response time for correct 'go' trials.
      - max_SSD: Maximum stop signal delay.
      - min_SSD: Minimum stop signal delay.
      - mean_SSD: Average stop signal delay.
      - stop_success: Percentage of successful stops.
      - stop_fail: Percentage of failed stops.
      - go_success: Percentage of successful 'go' trials.
      - stop_omission_rate: Omission rate for 'stop' trials.
      - go_omission_rate: Omission rate for 'go' trials.
    """
    test_trials__df = df[(df['trial_id'] == 'test_trial')]
    
    grouping_column = 'block_num' if split_by_block_num else None

    # If we're splitting by block_num, group the data by block_num
    if split_by_block_num:
        stop_trials = test_trials__df[(test_trials__df['condition'] == 'stop')].groupby(grouping_column)
        go_trials = test_trials__df[(test_trials__df['condition'] == 'go')].groupby(grouping_column)
    else:
        stop_trials = test_trials__df[(test_trials__df['condition'] == 'stop')]
        go_trials = test_trials__df[(test_trials__df['condition'] == 'go')]

    # Define a helper function to calculate metrics for a given group
    def calculate_metrics(group):
        stop_acc = group[group['condition'] == 'stop']['stop_acc'].mean()
        go_acc = group[group['condition'] == 'go']['go_acc'].mean()

        go_correct_trials = group[(group['condition'] == 'go') & (group['go_acc'] == 1)]
        avg_go_rt = go_correct_trials['rt'].mean()

        max_SSD = group['SSD'].max()
        min_SSD = group['SSD'].min()
        mean_SSD = group['SSD'].mean()

        stop_success = group[group['condition'] == 'stop']['stop_acc'].mean()
        stop_fail = 1 - stop_success

        go_success = group[group['condition'] == 'go']['go_acc'].mean()
        stop_omission_rate = group[group['condition'] == 'stop']['rt'].isna().mean()
        go_omission_rate = group[group['condition'] == 'go']['rt'].isna().mean()

        return {
            "stop_acc": stop_acc,
            "go_acc": go_acc,
            "avg_go_rt": avg_go_rt,
            "max_SSD": max_SSD,
            "min_SSD": min_SSD,
            "mean_SSD": mean_SSD,
            "stop_success": stop_success,
            "stop_fail": stop_fail,
            "go_success": go_success,
            "stop_omission_rate": stop_omission_rate,
            "go_omission_rate": go_omission_rate
        }

    # If we're splitting by block_num, apply the helper function to each group
    if split_by_block_num:
        results = test_trials__df.groupby(grouping_column).apply(calculate_metrics)
        print(results)
    else:
        results = calculate_metrics(test_trials__df)
        for key, value in results.items():
            print(f"{key}: {value}")

In [26]:
def calculate_attention_check_accuracy(df):
    """
    Calculates the attention check accuracy for attention checks
    
    This function computes the accuracy for a given set of attention checks for a single task df.
    
    Input:
      df: DataFrame containing task data for a specific task for a single subject.  
          
    Output:
      Prints the overall attention check accuracy for a given task df for a single subject. 
    """

    test_trials__df = df[(df['trial_id'] == 'test_attention_check')]
    attention_check_accuracy = test_trials__df['correct_trial'].mean()
    print(attention_check_accuracy)

In [12]:
def calculate_average_rt(df, condition_col='condition', test_trial='test_trial', correct_trial_col='correct_trial', factorial_condition=False, factorial_conditions=['cue_condition', 'task_condition'], split_by_block_num=False):
    """
    Calculates the average reaction time (RT) for given test trials based on specific conditions.
    
    This function can handle both standard conditions and factorial conditions. Additionally,
    results can optionally be split by block number.
    
    Input:
      df: DataFrame containing the task data.
      condition_col: Name of the column representing the condition. Default is 'condition'.
      test_trial: Name of the column indicating the type of trial. Default is 'test_trial'.
      correct_trial_col: Column indicating if the trial was correctly executed. Default is 'correct_trial'.
      factorial_condition: Boolean to specify if the data has factorial conditions. Default is False.
      factorial_conditions: List of columns indicating factorial conditions. Default is ['cue_condition', 'task_condition'].
      split_by_block_num: Boolean to specify if results should be split by block number. Default is False.
    
    Output:
      Prints the average RT for the specified conditions.
    """    
    test_trials__df = df[(df['trial_id'] == test_trial) & (df[correct_trial_col] == 1)]
    
    if factorial_condition:
        grouping_columns = factorial_conditions
    else:
        grouping_columns = [condition_col]
    
    if split_by_block_num:
        grouping_columns.append('block_num')
    
    rt_by_condition = test_trials__df.groupby(grouping_columns).apply(lambda group: group['rt'].mean())
    print(rt_by_condition)

In [43]:
def calculate_omission_rate(df, test_trial='test_trial', condition_col='condition', factorial_condition=False, factorial_conditions=['cue_condition', 'task_condition'], split_by_block_num=False, is_go_no_go=False):
    """
    Calculates the omission rate for given test trials based on specific conditions.
    
    Omission rate refers to the proportion of missing reaction times (RTs) in the data. This function
    supports calculations for both standard and factorial conditions. Results can optionally be split 
    by block number.
    
    Input:
       df: DataFrame containing task data for a specific task for a single subject.
      test_trial: Name of the column indicating the type of trial. Default is 'test_trial'.
      condition_col: Name of the column representing the condition. Default is 'condition'.
      factorial_condition: Boolean to specify if the data has factorial conditions. Default is False.
      factorial_conditions: List of columns indicating factorial conditions. Default is ['cue_condition', 'task_condition'].
      split_by_block_num: Boolean to specify if results should be split by block number. Default is False.
    
    Output:
      Prints the omission rate for the specified conditions.
    """
     
    test_trials__df = df[df['trial_id'] == test_trial]
    
    if factorial_condition:
        grouping_columns = factorial_conditions
    else:
        grouping_columns = [condition_col]
    
    if split_by_block_num:
        grouping_columns.append('block_num')
    
    omission_rate = test_trials__df.groupby(grouping_columns).apply(lambda group: group['rt'].isna().mean())

    if is_go_no_go:
        omission_rate = omission_rate['go']

    print(omission_rate)

In [14]:
def calculate_omission_rate__span(df):
    """
    Calculates the omission rate for the 'span' task based on specific trial types and response lengths.
    
    This function targets the 'span' task data to calculate two types of omissions: 
    1) Completely empty responses, and 
    2) Incomplete responses (i.e., responses with a length between 1 and 3).
    
    Additionally, this function calculates the omission rate for 'test_inter-stimulus' trials, 
    which is the proportion of missing reaction times (RTs) in these trials.

    Input:
       df: DataFrame containing task data for a specific task for a single subject.
    
    Output:
      Prints the mean number of empty responses, the mean number of incomplete responses, 
      and the omission rate for 'test_inter-stimulus' trials.
    """
    test_response_trials__df = df[df['trial_id'] == 'test_response'].copy()
    test_processing_trials__df = df[df['trial_id'] == 'test_inter-stimulus']

    # Convert the strings in the 'response' column to actual lists
    test_response_trials__df['response'] = test_response_trials__df['response'].apply(ast.literal_eval)

    omission_rate_processing_trials = test_processing_trials__df['rt'].isna().mean()

    # Calculate the number of empty and incomplete responses
    test_response_trials__df['empty'] = test_response_trials__df['response'].apply(lambda x: len(x) == 0)
    test_response_trials__df['incomplete'] = test_response_trials__df['response'].apply(lambda x: 0 < len(x) < 4)

    # Get the mean of each type
    mean_empty = test_response_trials__df['empty'].mean()
    mean_incomplete = test_response_trials__df['incomplete'].mean()

    print(f"Mean number of empty responses: {mean_empty}")
    print(f"Mean number of incomplete responses: {mean_incomplete}")

    print(f"Omission rate processing trials: {omission_rate_processing_trials}")


In [15]:
def calculate_average_accuracy(df, correct_trial_col='correct_trial', condition_col='condition', test_trial='test_trial', factorial_condition=False, factorial_conditions=[], split_by_block_num=False):
    """
    Calculates the average accuracy for given test trials based on specified conditions.
    
    This function computes the mean accuracy for a given set of test trials. It allows for grouping
    by a single condition or multiple factorial conditions. The option to further split by block number
    is also available. The accuracy is determined by averaging the values in the `correct_trial_col`.
    
    Input:
      df: DataFrame containing task data for a specific task for a single subject.
      correct_trial_col (optional): Name of the column indicating correct trials (default is 'correct_trial').
      condition_col (optional): Name of the main condition column for grouping (default is 'condition').
      test_trial (optional): Specifies the trial type to be considered for accuracy calculation (default is 'test_trial').
      factorial_condition (optional): Boolean flag indicating if factorial conditions should be used for grouping (default is False).
      factorial_conditions (optional): List of columns to be used for factorial grouping (default is an empty list).
      split_by_block_num (optional): Boolean flag to determine if results should be split by block number (default is False).
      
    Output:
      Prints the average accuracy grouped by the specified conditions.
    """
   
    test_trials__df = df[df['trial_id'] == test_trial]
    
    if factorial_condition:
        grouping_columns = factorial_conditions
    else:
        grouping_columns = [condition_col]
    
    if split_by_block_num:
        grouping_columns.append('block_num')
    
    accuracy_by_condition = test_trials__df.groupby(grouping_columns)[correct_trial_col].mean()
    
    print(accuracy_by_condition)


In [17]:
# Importing sample data files for different RDoC tasks.
ax_cpt__df = pd.read_csv('./data/ax_cpt_rdoc_23-10-25-16:51.json.csv') # need to use probe trial
cued_ts__df = pd.read_csv('./data/cued_task_switching_rdoc_23-10-25-17:36.json.csv') # need to use factorial conditions, cue_condition & task_condition
flanker__df = pd.read_csv('./data/flanker_rdoc_23-10-25-18:02.json.csv')
go_nogo__df = pd.read_csv('./data/go_nogo_rdoc_23-10-25-18:20.json.csv')
n_back__df = pd.read_csv('./data/n_back_rdoc_23-10-25-18:33.json.csv') # need to use delay instead
span__df = pd.read_csv('./data/span_rdoc__behavioral_23-10-25-21:45.json.csv') 
spatial_ts__df = pd.read_csv('./data/spatial_task_switching_rdoc_23-10-25-20:33.json.csv')
spatial_cueing__df = pd.read_csv('./data/spatial_cueing_rdoc_23-10-25-20:56.json.csv')
stroop__df = pd.read_csv('./data/stroop_rdoc_23-10-25-18:44.json.csv')
stop_signal__df = pd.read_csv('./data/stop_signal_rdoc_23-10-25-19:22.json.csv')
visual_search__df = pd.read_csv('./data/visual_search_rdoc_23-10-25-19:10.json.csv')

In [47]:
# these are the ones that can be called normally, without customizing any arguments
calculate_average_accuracy(flanker__df)
calculate_average_rt(flanker__df)
calculate_omission_rate(flanker__df)

calculate_average_accuracy(go_nogo__df)
calculate_average_rt(go_nogo__df)

calculate_average_accuracy(spatial_cueing__df)
calculate_average_rt(spatial_cueing__df)
calculate_omission_rate(spatial_cueing__df)


calculate_average_accuracy(stroop__df)
calculate_average_rt(stroop__df)
calculate_omission_rate(stroop__df)

condition
congruent      0.616667
incongruent    0.516667
Name: correct_trial, dtype: float64
condition
congruent      579.702703
incongruent    611.483871
dtype: float64
condition
congruent      0.350000
incongruent    0.416667
dtype: float64
condition
go      0.685185
nogo    0.888889
Name: correct_trial, dtype: float64
condition
go      360.891892
nogo           NaN
dtype: float64
condition
doublecue    0.361111
invalid      0.388889
nocue        0.319444
valid        0.425926
Name: correct_trial, dtype: float64
condition
doublecue    384.538462
invalid      432.428571
nocue        408.000000
valid        376.869565
dtype: float64
condition
doublecue    0.611111
invalid      0.611111
nocue        0.680556
valid        0.574074
dtype: float64
condition
congruent      0.390244
incongruent    0.379747
Name: correct_trial, dtype: float64
condition
congruent      544.0
incongruent    651.1
dtype: float64
condition
congruent      0.512195
incongruent    0.569620
dtype: float64


In [44]:
# below should be the only instances where the function call is something other than the default 
# most tasks share the same structure (e.g condition column called condition, test trials called test_trial, but the ones below do not)
# note: attention checks is always the same

### ax-cpt
calculate_average_accuracy(ax_cpt__df, test_trial='test_probe')  # need different test_trial than test_trial, must be test_probe instead
calculate_average_rt(ax_cpt__df, test_trial='test_probe')
calculate_omission_rate(ax_cpt__df, test_trial='test_probe')

### cued ts
calculate_average_accuracy(cued_ts__df, factorial_condition=True, factorial_conditions=['cue_condition', 'task_condition']) # need to use factorial for cue_condition and task_condition since separate cols
calculate_average_rt(cued_ts__df, factorial_condition=True, factorial_conditions=['cue_condition', 'task_condition'])
calculate_omission_rate(cued_ts__df, factorial_condition=True, factorial_conditions=['cue_condition', 'task_condition'])
### Note: Looks like we don't have to do the above for spatial_ts since it already combines in the conditions (e.g. tstay_cstay) prior to exporting data. 

### gonogo
calculate_omission_rate(go_nogo__df, is_go_no_go=True) # since omission rate is only for go trial

### nback  
calculate_average_accuracy(n_back__df, condition_col='delay') # need to use delay instead of 'match' , 'mismatch' condition 
calculate_average_rt(n_back__df, condition_col='delay') # need to use delay instead of 'match' , 'mismatch' condition 
calculate_omission_rate(n_back__df, condition_col='delay')

### span (simple & operation; i.e. storage-only and same-domain)
calculate_average_accuracy(span__df, test_trial='test_response')
calculate_average_rt(span__df, test_trial='test_inter-stimulus', correct_trial_col='correct_response')
calculate_omission_rate__span(span__df) # need something different for omissions since no response in test_response is [] and incomplete is [].length < 4

### stop signal
get_stopping_data(stop_signal__df)

### visual search
calculate_average_accuracy(visual_search__df, factorial_condition=True, factorial_conditions=['condition', 'num_stimuli']) # need to use factorial for load and feature/conjunction
calculate_average_rt(visual_search__df, factorial_condition=True, factorial_conditions=['condition', 'num_stimuli'])
calculate_omission_rate(visual_search__df, factorial_condition=True, factorial_conditions=['condition', 'num_stimuli'])

condition
AX    0.333333
AY    0.400000
BX    0.533333
BY    0.400000
Name: correct_trial, dtype: float64
condition
AX    554.550000
AY    542.916667
BX    462.312500
BY    486.583333
dtype: float64
condition
AX    0.533333
AY    0.566667
BX    0.366667
BY    0.566667
dtype: float64
cue_condition  task_condition
na             na                1.000000
stay           stay              0.708333
switch         stay              0.541667
               switch            0.604167
Name: correct_trial, dtype: float64
cue_condition  task_condition
na             na                563.666667
stay           stay              682.823529
switch         stay              706.923077
               switch            664.068966
dtype: float64
cue_condition  task_condition
na             na                0.000000
stay           stay              0.208333
switch         stay              0.312500
               switch            0.270833
dtype: float64
0.3148148148148148
delay
1.0    0.861538
2.0    

## Note about modifying function calls
These are tasks that need slightly different arguments for accuracy, rt, and/or omission calculations:

### ax-cpt
```python
calculate_average_accuracy(ax_cpt__df, test_trial='test_probe')  # need different test_trial than test_trial, must be test_probe instead
```

### cued ts
```python 
calculate_average_accuracy(cued_ts__df, factorial_condition=True, factorial_conditions=['cue_condition', 'task_condition']) # need to use factorial for cue_condition and task_condition since separate cols
```
#### Note: Looks like we don't have to do this for spatial_ts since it already combines in the conditions (e.g. tstay_cstay) prior to exporting data. 

### nback  
```python
calculate_average_accuracy(n_back__df, condition_col='delay') # need to use delay instead of 'match' , 'mismatch' condition 
calculate_average_rt(n_back__df, condition_col='delay') # need to use delay instead of 'match' , 'mismatch' condition 
```

### span (simple & operation; i.e. storage-only and same-domain)
```python
calculate_average_accuracy(span__df, test_trial='test_response')
calculate_average_rt(span__df, test_trial='test_inter-stimulus', correct_trial_col='correct_response')
calculate_omission_rate__span(span__df) # need something different for omissions since no response in test_response is [] and incomplete is [].length < 4
```

### stop signal
```python
get_stopping_data(stop_signal__df)
```

### visual search
```python
calculate_average_accuracy(visual_search__df, factorial_condition=True, factorial_conditions=['condition', 'num_stimuli']) # need to use factorial for load and feature/conjunction
```