Sophie van Dijke (s3567826) Assignment 3

In [1]:
from model import Model
from dmchunk import Chunk

import os
import math
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

Define noise, time_to_pulses, and pulses_to_time:

In [2]:
def noise(s):
    rand = random.uniform(0.001,0.999)
    return s * math.log((1-rand)/rand)

In [3]:
def time_to_pulses(time, t_0=0.011, a =1.1, b = 0.015):
    pulses = 0
    pulse_duration = t_0
    while time >=0: # pulse duration
        time -=pulse_duration
        pulses+=1
        pulse_duration = a * pulse_duration + noise(b * a * pulse_duration)
    return pulses

In [4]:
def pulses_to_time(pulses, t_0 = 0.011, a = 1.1, b = 0.015):
    time = 0
    pulse_duration = t_0
    while pulses > 0:
        time = time + pulse_duration
        pulses = pulses -1
        pulse_duration = a * pulse_duration + noise(b * a * pulse_duration)
    return time

Define piors and quadrants:

In [5]:
# define priors
p1 = np.array([.450, .525, .600, .675, .750])
p2 = np.array([.750, .825, .900, .975, 1.050])
p3 = np.array([.900, .975, 1.050, 1.125, 1.200])
p4 = np.array([.450, .525, .600, .675, .750, .825, .900, .975, 1.050, 1.125, 1.200])

prior = {
    1: p1,
    2: p2,
    3: p3,
    4: p4
}

# define quadrants
quad = ["TR", "TL", "BR", "BL"]

# create IDs for stimuli
stimulus = ["IMG_" + str(x) for x in range(1, 401)]

Define model:

In [9]:
def class_experiment(n_participants, bl = 13, p_trial = 16, trials = 48):
    dat = pd.DataFrame(index=range(n_participants * (bl * trials + p_trial)),
                   columns=["student_nr", "Block_nr", "practice", "quadrant", "stimulus", "prior", "ts", "repr_error",
                            "sub_id", "trial_index", "tr", "ts_1", "ts_2", "ts_3", "ts_4", "ts_5", "ts_6", "ts_7", "qrep", "ts_stim"])
    idx = 1
    practice = ""
    qrep = ""
    ts_stim = ""
    repr_error = 0
    
    for p in range(1, n_participants+1):
        # initialize model
        m = Model ()
        
        # initialize dictionary where each quadrant is associated with a prior
        association_dict = {q: [] for q in quad}
        
        # loop through quadrants and randomly associate with priors
        for q in quad:
            # choose a prior randomly from the available priors
            available_priors = list(prior.keys())

            # remove priors that are already associated with other quadrants
            for associated_prior_list in association_dict.values():
                if associated_prior_list:
                    associated_prior = associated_prior_list[0]
                    available_priors.remove(associated_prior)

            # choose a prior from the remaining available priors
            selected_prior = random.choice(available_priors)

            # associate this prior with a quadrant
            association_dict[q].append(selected_prior)
        
        # reset trial_index
        trial_index = 1
        
        #loop through blocks
        for b in range(bl):
            
             # first block is a practice block
            if b == 0:
                # Initialize a counter for each quadrant
                quad_counter = {q: 0 for q in quad}
                
                # practice trials
                for t in range(1, p_trial + 1):

                    practice = "yes"
                    stim = "circle"
                    ts_stim = "NA"
                    
                    
                    # Choose a random quadrant until it has been selected 4 times
                    selected_quad = random.choice(quad)
                    while quad_counter[selected_quad] >= 4:
                        selected_quad = random.choice(quad)

                    # Update the counter for the selected quadrant
                    quad_counter[selected_quad] += 1

                    # Get the associated prior for the selected quadrant
                    selected_prior = association_dict[selected_quad][0]

                     # fixation cross
                    m.time += 0.3

                    # indication of upcoming quadrant
                    m.time += 0.375

                    # stimulus presented
                    ts = np.random.choice(prior[selected_prior])
                    m.time += ts
                        
                    # convert interval to pulses
                    ts_pulses = time_to_pulses(ts)
                        
                    # add encounter to DM
                    encounter = Chunk(name = "p-" + str(ts_pulses)+selected_quad, slots = {"type":"interval", "pulses":ts_pulses,
                                                                                          "quadrant":selected_quad})
                    m.add_encounter(encounter)
                    m.time += 0.05

                    # delay
                    m.time += 0.25

                    # retrieve
                    request = Chunk(name = "retrieve-interval", slots = {"type":"interval"})
                    retrieved_pulses, latency = m.retrieve_blended_trace(request, "pulses")
                    m.time += 0.05
                    m.time += latency
                        
                    # participant replicates
                    tr = pulses_to_time(retrieved_pulses)
                    m.time += tr
                    
                    # calculate reproduction error
                    repr_error = tr - ts
                    
                    # put np.nan for values that will be filled in later
                    dat.loc[idx] = [p, b, practice, selected_quad, stim, f'p{selected_prior}', ts, repr_error, p, trial_index, tr,
                                    np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, ts_stim]
                    
                    # check if the quadrant is repeated
                    if t > 1:
                        previous_quad = dat.loc[idx - 1, 'quadrant'] if idx > 0 else None
                        if selected_quad == previous_quad:
                            qrep = "rep"
                        else:
                            qrep = "swi"
                    else:
                        qrep = "NA"
                        
                    dat.loc[idx, 'qrep'] = qrep
                    
                    # Number of columns to populate
                    num_columns = 7

                    # Populate 'ts_1' to 'ts_7'
                    for i in range(1, num_columns + 1):
                        col_name = f'ts_{i}'
                        shift_amount = i

                        dat.loc[idx, col_name] = dat.loc[idx - shift_amount, 'ts'] if trial_index > shift_amount else 'NA'
                    
                    idx += 1
                    trial_index += 1
                        
            else:
                
                # initialize a counter for each quadrant
                quad_counter = {q: 0 for q in quad}
                    
                for t in range(1, trials + 1):

                    practice = "no"
                    
                    # if odd number block, new stimulus
                    if b % 2 == 1:
                        # reset stim list on first trial
                        if t == 1:
                            stim_list = []
                        stim = random.choice(stimulus)
                        # add the value of stim to stim_list
                        stim_list.append(stim)
                        # no previous ts_stim
                        ts_stim = "NA"
                    # if even number block, choose stim from stim_list
                    else:
                        stim = random.sample(stim_list, 1)[0]
                        # get ts value from previous block for corresponding stim
                        ts_stim = dat.loc[(dat['Block_nr'] == b-1) & (dat['stimulus'] == stim), 'ts'].values[0]
                        
                    # Choose a random quadrant until it has been selected 12 times
                    selected_quad = random.choice(quad)
                    while quad_counter[selected_quad] >= 12:
                        selected_quad = random.choice(quad)

                    # Update the counter for the selected quadrant
                    quad_counter[selected_quad] += 1

                    # Get the associated prior for the selected quadrant
                    selected_prior = association_dict[selected_quad][0]

                     # fixation cross
                    m.time += 0.3

                    # indication of upcoming quadrant
                    m.time += 0.375

                    # stimulus presented
                    ts = np.random.choice(prior[selected_prior])
                    m.time += ts
                        
                    # convert interval to pulses
                    ts_pulses = time_to_pulses(ts)
                        
                    # add encounter to DM
                    encounter = Chunk(name = "p-" + str(ts_pulses)+selected_quad, slots = {"type":"interval", "pulses":ts_pulses,
                                                                                          "quadrant":selected_quad})
                    m.add_encounter(encounter)
                    m.time += 0.05

                    # delay
                    m.time += 0.25

                    # retrieve
                    request = Chunk(name = "retrieve-interval", slots = {"type":"interval"})
                    retrieved_pulses, latency = m.retrieve_blended_trace(request, "pulses")
                    m.time += 0.05
                    m.time += latency
                        
                    # participant replicates
                    tr = pulses_to_time(retrieved_pulses)
                    m.time += tr
                    
                    # calculate reproduction error
                    repr_error = tr - ts

                    dat.loc[idx] = [p, b, practice, selected_quad, stim, f'p{selected_prior}', ts, repr_error, p, trial_index, tr,
                                    np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, ts_stim]
                    
                    # check if the quadrant is repeated
                    if t > 1:
                        previous_quad = dat.loc[idx - 1, 'quadrant'] if idx > 0 else None
                        if selected_quad == previous_quad:
                            qrep = "rep"
                        else:
                            qrep = "swi"
                    else:
                        qrep = "NA"
                        
                    dat.loc[idx, 'qrep'] = qrep
                    
                    # Number of columns to populate
                    num_columns = 7

                    # Populate 'ts_1' to 'ts_7'
                    for i in range(1, num_columns + 1):
                        col_name = f'ts_{i}'
                        shift_amount = i

                        dat.loc[idx, col_name] = dat.loc[idx - shift_amount, 'ts'] if trial_index > shift_amount else 'NA'
                    
                    idx += 1
                    trial_index += 1
                    
    # Save DataFrame to CSV file
    dat.to_csv('output_file.csv')

In [11]:
class_experiment(35)