# Data Preprocessing for Illusory Pitch Study

In [None]:
import numpy as np
import pandas as pd
import scipy.stats as stats
from glob import glob

def dprime_and_c(hit_rate, fa_rate):
    
    # Get corresponding z-scores for the hit rate and false alarm rate
    zH = stats.norm.ppf(hit_rate)
    zF = stats.norm.ppf(fa_rate)
    
    # Calculate d' and C using z-scores
    dprime = zH - zF
    C = -(zH + zF) / 2
    
    return dprime, C

### Load data

In [None]:
# Find all data files
datafiles = glob('../data/IPAD_*.csv')

# Load each data file and concatenate them into a single table
d = pd.concat((pd.read_csv(f) for f in datafiles))

# Select only main trial events
d = d[d.event == 'trial']

# Recode the pitch shift and key press as 0 for "lower" and 1 for "higher"
d = d.assign(answer = d['shift'] == '+',
            response = d['response'] == '+')

# Before version 1.1, the experiment always loaded the stimuli for difficulty 1.0, so manually overwrite recorded trial difficulty
d.loc[d.version < 1.1, 'difficulty'] = 1.0

### Calculate scores within each subject and condition

In [None]:
# Define conditions as (octave, offset) pairs
conditions = [(1, 425), (1, 500), (1, 575), (0.5, 425), (0.5, 500), (0.5, 575)]
shift_sizes = [1, 0.5]
intervals = [425, 500, 575]
offsets = [-15, 0, 15]

# Scores will be stored in a long-format table
scores = pd.DataFrame(columns=['subject', 'experimenter', 'version', 'jnd', 'shift_size', 'interval', 'offset',
                               'hit_rate', 'fa_rate', 'accuracy', 'perc_resp_low', 'dprime', 'C', 'rt'])
fits = pd.DataFrame(columns=['subject', 'experimenter', 'version', 'jnd', 'shift_size', 'C_intercept', 'C_slope'])

# Calculate scores for each subject
for s, subj in enumerate(d.subject.unique()):

    # Select all responses from the current subject
    subj_trials = d[d.subject == subj]
    experimenter = subj_trials.iloc[-1].experimenter
    exp_version = subj_trials.iloc[-1].version
    jnd = subj_trials.iloc[-1].jnd
    
    # Calculate scores within each condition
    C_values = np.full((len(intervals), len(shift_sizes)+1), np.nan)
    for i, interval in enumerate(intervals):
        
        interval_trials = subj_trials[subj_trials.interval == interval]
        
        # Calculate hit and false alarm rates, pooling across difficulties, using Hautus (1995) adjustment to avoid 0s and 1s
        hit_rate = (np.sum(interval_trials.answer & interval_trials.response) + .5) / (np.sum(interval_trials.answer) + 1)
        fa_rate = (np.sum(~interval_trials.answer & interval_trials.response) + .5) / (np.sum(~interval_trials.answer) + 1)

        # Calculate C based on the hit rate and false alarm rate
        _, C = dprime_and_c(hit_rate, fa_rate)
        C_values[i, -1] = C
        
        # Calculate scores for each probe timing offset and pitch shift size
        for j, shift_size in enumerate(shift_sizes):
            
            # Select all trials from the current condition
            trials = interval_trials[interval_trials.difficulty == shift_size]
            if len(trials) == 0:
                continue

            # Create dictionary to store scores from current subject and condition
            offset = offsets[i]
            condi_scores = dict(subject=subj, experimenter=experimenter, version=exp_version, shift_size=shift_size, interval=interval, offset=offset)
        
            # Calculate accuracy and the percent of the time the participant responded "lower"
            condi_scores['accuracy'] = np.mean(trials.correct)
            condi_scores['perc_resp_low'] = np.mean(~trials.response)
            
            # Calculate hit and false alarm rates using Hautus (1995) adjustment to avoid 0s and 1s
            condi_scores['hit_rate'] = (np.sum(trials.answer & trials.response) + .5) / (np.sum(trials.answer) + 1)
            condi_scores['fa_rate'] = (np.sum(~trials.answer & trials.response) + .5) / (np.sum(~trials.answer) + 1)
        
            # Calculate d' and C based on the hit rate and false alarm rate
            condi_scores['dprime'], condi_scores['C'] = dprime_and_c(condi_scores['hit_rate'], condi_scores['fa_rate'])
            C_values[i, j] = condi_scores['C']
    
            condi_scores['rt'] = trials.rt.mean()
            condi_scores['jnd'] = jnd
        
            # Add current scores as a row to the full table of scores
            scores.loc[len(scores.index)] = condi_scores
    
    # Calculate timing-induced bias by fitting a line across the three C values for different offsets, separately for each shift size
    for i in range(len(shift_sizes) + 1):
        
        # Skip current shift size if participant did not receive this difficulty
        if np.all(np.isnan(C_values[:, i])):
            continue
        
        shift_size = shift_sizes[i] if i < len(shift_sizes) else 'pooled'
        fit_data = dict(subject=subj, experimenter=experimenter, version=exp_version, shift_size=shift_size, jnd=jnd)
        
        # Fit a line across the three C values at the current shift size
        slope, intercept = np.polyfit(offsets, C_values[:, i], 1)
    
        # Add the slope and intercept of the line to the subject's data dictionary
        fit_data['C_intercept'] = intercept
        fit_data['C_slope'] = slope
        
        # Add subject's data to the overall dataframe
        fits.loc[len(fits.index)] = fit_data

In [None]:
scores.to_csv('../data/scores.csv', index=False)
fits.to_csv('../data/fits.csv', index=False)