-
Notifications
You must be signed in to change notification settings - Fork 0
/
parametersearch_MPI.py
417 lines (332 loc) · 15.6 KB
/
parametersearch_MPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
"""
ACT-R basic reader. This does not do any parsing, it just sees a word, retrieves it, moves on.
It searches for parameters.
The location of the last word in each line is known (it is stored as an extra column in the analyzed dataset). This simplifies production rules, and in particular, shift to a new line.
Running the file requires MPI.
"""
import warnings
import sys
import pandas as pd
import pyactr as actr
import math
from simpy.core import EmptySchedule
import numpy as np
import re
import scipy.stats as stats
import scipy
from pymc3 import Model, Gamma, Normal, HalfNormal, Uniform, find_MAP, Slice, sample, summary, Metropolis, traceplot
from pymc3.backends.base import merge_traces
import theano
import theano.tensor as T
from theano.compile.ops import as_op
from mpi4py import MPI
import matplotlib.pyplot as pp
warnings.filterwarnings(action="ignore", category=UserWarning)
#########files##########
MATERIALS = "materials_final.csv"
DATA = "summed_eyetracking_parameter_search.csv"
#########info needed for calculation of activation##########
SEC_IN_YEAR = 365*24*3600
SEC_IN_TIME = 15*SEC_IN_YEAR
USED_WORDS = 112.5 #number of words in the corpus from which frequencies are taken (in millions)
###########parameters for ACT-R##################
DECAY = 0.5
RETRIEVAL_THRESHOLD = -50 #low number so that retrieval never fails (could be changed later)
OPTIMIZED_LEARNING = False #parameter, often used in ACT-R, that speeds up base-level calculation, B; we don't need it because we will calculate B directly here, not case by case in pyactr; therefore, for every word, calculation is done only once
EMMA_NOISE = False #do we want to assume landing and timing estimates in the EMMA eye mvt model to be deterministic, or noisy?
#############parameters for sampling##################
NDRAWS = 100 #how many draws?
CHAIN = 0 #chain number
def load_texts(lfile=MATERIALS):
"""
Loads the basic eye tracking text.
"""
workbook = pd.ExcelFile(lfile)
worksheet = workbook.parse()
return worksheet
def load_file(lfile, index_col=None, sep=","):
"""
Loads file as a list
"""
csvfile = pd.read_csv(lfile, index_col=index_col, header=0, sep=sep)
return csvfile
##############ACT-R model, basics#####################
environment = actr.Environment(size=(1366, 768), focus_position=(0, 0))
actr.chunktype("read", "state word")
actr.chunktype("parsing", "top")
actr.chunktype("word", "form cat")
#the model with basic parameters set up
parser = actr.ACTRModel(environment, subsymbolic=True, optimized_learning=OPTIMIZED_LEARNING, retrieval_threshold=RETRIEVAL_THRESHOLD, decay=DECAY, emma_noise=EMMA_NOISE, emma_landing_site_noise=EMMA_NOISE)
parser.productionstring(name="attend word", string="""
=g>
isa read
state start
=visual_location>
isa _visuallocation
?visual>
state free
buffer empty
==>
=g>
isa read
state start
+visual>
isa _visual
cmd move_attention
screen_pos =visual_location""") #this rule is used at the beginning and when a new line is started
parser.productionstring(name="retrieve word", string="""
=g>
isa read
state start
=visual>
isa _visual
value =val
?retrieval>
state free
==>
=g>
isa read
state parse
word =val
~visual>
+retrieval>
isa word
form =val""")
def reading(grouped, rank, declchunks):
"""
Main function, running reading.
"""
for name, group in grouped: #name is name of the sentence in materials, group = sentence
#print(name)
stimulus = {}
stimuli = []
y_position = 0 #this will check if some text appears on the same screen or not
freq = {}
positions = {}
lastwords = {} #storing last words in each line
#the following loop runs through a sentence word by word and creates a stimulus out of the sentence, recording the exact position; it also stores each word in decl. memory with the correct activation
for idx, i in enumerate(group.index):
if group.IA_TOP[i] < y_position:
stimuli.append(stimulus)
stimulus = {}
word = str(group.WORD[i])
pos = str(group.PART_OF_SPEECH[i])
stimulus[i] = {'text': word, 'position': (group.IA_CENTER[i], group.IA_TOP[i]), "vis_delay": len(word)}
positions[(int(group.IA_CENTER[i]), int(group.IA_TOP[i]))] = [idx, 0] #dict to record RTs
lastwords[str(group.IA_TOP[i])] = str(group.LAST_WORD[i]) #dict for last words in line
last_position = (group.IA_CENTER[i], group.IA_TOP[i]) #last word in sentence; when reached, stop sim
#add word into memory
if not actr.makechunk("", typename="word", form=word, cat=pos) in declchunks:
freq[word] = group.FREQUENCY[i]
if freq[word] == 0:
freq[word] = 1
word_freq = freq[word] * USED_WORDS/100 #BNC - 100 millions; estimated use by entering adulthood - 112.5 millions; we have to multiply by 1.125)
time_interval = SEC_IN_TIME / word_freq
chunk_times = np.arange(start=-time_interval, stop=-(time_interval*word_freq)-1, step=-time_interval)
declchunks[actr.makechunk("", typename="word", form=word, cat=pos)] = math.log(np.sum((0-chunk_times) ** (-DECAY)))
y_position = group.IA_TOP[i]
#the following 2 rules signal whether the reader should move to a new line or not depending on the x position
for y in lastwords:
tempstring = "\
=g>\
isa read\
state parse\
?retrieval>\
buffer full\
=visual_location>\
isa _visuallocation\
screen_y =ypos\
screen_y " + y + "\
screen_x ~ " + lastwords[y] + "\
==>\
=g>\
isa read\
state start\
?visual_location>\
attended False\
+visual_location>\
isa _visuallocation\
screen_x lowest\
screen_y =ypos\
~retrieval>"
parser.productionstring(name="move eyes in the line " + y, string=tempstring)
tempstring = "\
=g>\
isa read\
state parse\
?retrieval>\
buffer full\
=visual_location>\
isa _visuallocation\
screen_y =ypos\
screen_y " + y + "\
screen_x " + lastwords[y] + "\
==>\
=g>\
isa read\
state start\
?visual_location>\
attended False\
+visual_location>\
isa _visuallocation\
screen_x lowest\
screen_y onewayclosest\
~retrieval>"
parser.productionstring(name="move eyes to a new line" + y, string=tempstring)
#in the following part, the parser is made ready for reading - modules and buffers are initialized, focus is placed on the 1st word in sentence
stimuli.append(stimulus)
parser.decmems = {}
parser.set_decmem({x: np.array([]) for x in declchunks})
parser.decmem.activations.update(declchunks)
parser.retrievals = {}
parser.set_retrieval("retrieval")
parser.visbuffers = {}
environment.current_focus = [stimuli[0][min(stimuli[0])]['position'][0],stimuli[0][min(stimuli[0])]['position'][1]] #focus on the first word
parser.visualBuffer("visual", "visual_location", parser.decmem, finst=80)
parser.goals = {}
parser.set_goal("g")
parser.set_goal("g2")
parser.goals["g"].add(actr.chunkstring(string="""
isa read
state start"""))
parser.goals["g2"].add(actr.chunkstring(string="""
isa parsing
top None"""))
parser.goals["g2"].delay = 0.2
#simulation is started
sim = parser.simulation(realtime=False, trace=False, gui=True, environment_process=environment.environment_process, stimuli=stimuli, triggers='A', times=100)
#variables that are used in recording eye fixation times are initialized
last_time = 0
cf = tuple(environment.current_focus)
generated_list = []
#we'll run a loop in which we proceed event by event; we record eye fixation times (by recording the time until cf (eye focus) changes)); any break signals the end of simulation; break could arise not only when the sentence is finished; it could also arise if something went wrong, e.g., if time was higher than 100 seconds (if, for example, parameter values were very high) or if deck of events got empty
while True:
if sim.show_time() > 100:
generated_list = [len(group.WORD) - 2] + [0] * (len(group.WORD) - 2) #0 if getting stuck
break
try:
#next event
sim.step()
#print(sim.current_event)
except (EmptySchedule, OverflowError):
generated_list = [len(group.WORD) - 2] + [10000] * (len(group.WORD) - 2) #10000 if not finishing or overflowing (too much time)
break
if not positions:
break
if cf[0] != environment.current_focus[0] or cf[1] != environment.current_focus[1]:
positions[cf][1] = 1000*(sim.show_time() - last_time) #time in milliseconds
last_time = sim.show_time()
cf = tuple(environment.current_focus)
if cf == last_position:
break
#after simulation, we collect eye fixation times in a simple list that can be transferred using MPI
if not generated_list:
ordered_keys = sorted(list(positions), key=lambda x:positions[x][0]) #keys ordered from first word to last word
generated_list = [len(group.WORD)-2] + [positions[x][1] for x in ordered_keys][1:-1] #first and last words ignored
assert len(generated_list) == len(group.WORD) - 1, "In %s, the length of generated RTs would be %s, expected number of words is %s. This is illegal mismatch" %(name, len(generated_list) + 1, len(group.WORD))
comm.Send(bytearray(name, 'utf-8'), dest=0, tag=0)
sent_list = np.array(generated_list, dtype=np.float)
comm.Send([sent_list, MPI.FLOAT], dest=0, tag=1)
#when we are done with all simulations, we send info to the master
comm.Send(bytearray('DONE', 'utf-8'), dest=0, tag=0)
return declchunks
def repeated_reading(grouped, rank):
"""
Function for slaves: when receiving True from master, start reading.
"""
declchunks = {} #this variable stores all chunks in declarative memory; this speeds up simulation since words that were created once do not have to be re-created again
while True:
received_list = np.empty(3, dtype=np.float)
#the slave receives information about three parameters from the master and uploads the parser model accordingly
comm.Recv([received_list, MPI.FLOAT], 0, rank)
if received_list[0] == -1:
break
parser.model_parameters["latency_factor"] = received_list[0]
parser.model_parameters["latency_exponent"] = received_list[1]
parser.model_parameters["eye_mvt_angle_parameter"] = received_list[2]
declchunks = reading(grouped, rank, declchunks)
@as_op(itypes=[T.dscalar, T.dscalar, T.dscalar, T.dscalar, T.dscalar], otypes=[T.dvector])
def model(lf, le, emap, intercept, sigma):
"""
Workings of the model, run by the master that distributes the work to the slaves.
"""
sent_list = np.array([lf, le, emap], dtype = np.float)
#get slaves to work
for i in range(1, comm.Get_size()):
comm.Send([sent_list, MPI.FLOAT], dest=i, tag=i)
value_dict = {}
skipped = set()
#collect from slaves
while True:
if len(skipped) == comm.Get_size() - 1:
break
for i in range(1, comm.Get_size()):
if i in skipped:
continue
#receive string -- name
buf = bytearray(100)
m = comm.Irecv(buf, i, 0)
status = MPI.Status()
m.Wait(status)
length_buffer = status.Get_count(MPI.CHAR)
name = buf[:length_buffer].decode('utf-8')
if name == "DONE":
skipped.add(i)
continue
#receive list -- RTs
received_list = np.empty(200, dtype=np.float)
comm.Recv([received_list, MPI.FLOAT], i, 1)
value_dict[name] = received_list[1:int(received_list[0])+1]
return np.array([val for x in sorted(sorted(value_dict.keys(), key=lambda x:int(x[2:])), key=lambda x:int(x[0])) for val in value_dict[x] ]) #reorder sample in the right way -- x[2:] order sentences; x[0] order parts of et file
##########EXECUTION FROM HERE ON###############
worksheet = load_file(MATERIALS)
data = load_file(DATA)
data_sen_id = list(data.SENTENCE_ID)
Y = data.RT #We record actual reaction times as a vector
#materials has all sentences, but we use only a subset for param estimation; so select only the relevant subset
worksheet = worksheet.groupby('SENTENCE_ID', sort=False).filter(lambda x:x['SENTENCE_ID'].iloc[0] in data_sen_id)
sentences = [x for x, _ in worksheet.groupby('SENTENCE_ID', sort=False)]
n_sentences = len(sentences)
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
N_GROUPS = comm.Get_size() - 1 #Groups used for simulation - one less than used cores
if rank == 0: #master
print("CHAIN ", CHAIN)
print("DRAWS", NDRAWS)
print("How many sentences used?", n_sentences)
basic_model = Model()
#Below we specify the Bayesian model
with basic_model:
#Priors
le = HalfNormal('le', sd=0.5, testval=abs(np.random.randn())/2)
lf = Gamma('lf', alpha=1, beta=5, testval=abs(np.random.randn()/4))
#emvt = Gamma('emvt', alpha=1, beta=3, testval=abs(np.random.randn()))
emap = HalfNormal('emap', sd=1.0, testval=abs(np.random.randn()/2))
intercept = Normal('intercept', mu=50, sd=25)
sigma = HalfNormal('sigma', sd=20)
# Expected value of outcome
mu = intercept + model(lf, le, emap, intercept, sigma)
# Likelihood (sampling distribution) of observations
Normal('Y_obs', mu=mu, sd=50, observed=Y)
step = Metropolis(basic_model.vars)
trace = sample(NDRAWS, step=step, njobs=1, init='auto', tune=100)
traceframe = pd.DataFrame.from_dict({"intercept": trace['intercept'], "le": trace['le'], "lf": trace['lf'], "emap": trace["emap"], "sigma": trace["sigma"]})
print(traceframe)
traceframe.to_csv("output_" + str(CHAIN) + "_" + str(NDRAWS) + "parametersearch.csv", sep=",", encoding="utf-8")
traceplot(trace)
pp.savefig("plot_" + str(CHAIN) + "_" + str(NDRAWS) + "parametersearch.png")
#stop slaves from work
sent_list = np.array([-1, -1, -1], dtype = np.float)
for i in range(1, comm.Get_size()):
comm.Send([sent_list, MPI.FLOAT], dest=i, tag=i)
else: #slave
#the following part selects sentences that will be used for each slave
left = round((rank-1)*n_sentences/N_GROUPS)
right = round(rank*n_sentences/N_GROUPS)
if right != n_sentences:
worksheet = worksheet.groupby('SENTENCE_ID', sort=False).filter(lambda x:x['SENTENCE_ID'].iloc[0] in sentences[left:right]).groupby('SENTENCE_ID', sort=False)
else:
worksheet = worksheet.groupby('SENTENCE_ID', sort=False).filter(lambda x:x['SENTENCE_ID'].iloc[0] in sentences[left:]).groupby('SENTENCE_ID', sort=False)
repeated_reading(worksheet, rank)
print(rank, "STOPPED")
MPI.Finalize()