-
Notifications
You must be signed in to change notification settings - Fork 2
/
libmri.py
252 lines (196 loc) · 9.57 KB
/
libmri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
#
# MRC CBU PYTHON MEG WRAPPER
#
# This library is intended to allow for easy communication between our
# experiment and the MEG setup at the MRC Cognition and Brain Sciences Unit.
# It is based on the MATLAB class for doing the same thing by Tibor Auer and
# Johan Carlin (https://github.com/MRC-CBU/megsync/blob/master/MEGSynchClass.m)
# The implementation is intended for the CBU setup only, and uses the custom
# National Instruments setup in the MEG lab.
#
# Author: Edwin Dalmaijer
# Email: Edwin.Dalmaijer@mrc-cbu.cam.ac.uk
# Date: 2018-10-24
# Last update: 2018-11-21
import copy
import time
import nidaqmx
from nidaqmx.constants import LineGrouping
class MRITriggerBox:
def __init__(self, device="Dev1"):
"""Initialises the MRI Trigger Box. Note that the button box buttons'
names and channels are hard-coded into the initialisation function.
The same is true for the scanner pulse channel.
Keyword Arguments
device - String indicating the name of the device.
Don't mess with this unless you know what
you're doing. Default = "Dev1"
"""
# Channel for scanner pulse.
self._scan_chan = "port0/line0"
# Create a list with all short-hand button names.
self._button_list = ["B1", "B2", "B3", "B4"]
# Set all the channel names for the buttons.
self._button_channels = "port0/line1:4"
# INITIALISE
print("\nInitialising connection to the NI box...")
# Initialise the system.
system = nidaqmx.system.System.local()
# Present the available devices.
print("\tAvailable NI devices:")
for dev_name in system.devices:
print("\t\t%s" % (dev_name))
# Connect to the passed device.
self._dev_name = device
self._dev = system.devices[self._dev_name]
print("\nConnection established with '%s'!" % (self._dev_name))
print("\tDevice: %s" % (self._dev))
print("\nChannel details:")
print("Scanner pulse: %s" % (self._scan_chan))
print("Buttons: %s" % (self._buttons))
def get_button_state(self, button_list=None):
"""Returns a single sample from the button channels.
Keyword Arguments
button_list - List of button names, or None to automatically
poll all the buttons. Default = None
Returns
button_list, state - button_list is a list of all button names, and
state is a list of the associated Booleans.
"""
# Create a new Task to listen in on the button channels.
with nidaqmx.Task() as task:
# Add the digital input (di) channels.
task.di_channels.add_di_chan( \
"%s/%s" % (self._dev_name, self._button_channels), \
line_grouping=LineGrouping.CHAN_PER_LINE)
# Get a single sample from the digital input channels.
state = task.read(number_of_samples_per_channel=1, \
timeout=0.001)
# Unwrap state, which is a list of lists of samples.
# Example: [[False], [False], [False], [False], [False]]
# We want instead: [False, False, False, False, False]
for i, b in enumerate(state):
state[i] = b[0]
# Select all buttons if no specific ones are requested.
if button_list is None:
return copy.deepcopy(self._button_list), state
# Return only the requested buttons.
else:
l = []
for b in button_list:
if b not in self._button_list:
raise Exception("ERROR: Unknown button '%s'; available buttons: %s" \
% (b, self._button_list))
i = self._button_list.index(b)
l.append(state[i])
return button_list, l
def wait_for_sync(self, timeout=None):
"""Waits until the MRI scanner sends a sync pulse, or until a timeout
occurs. Make sure to set the timeout rather high if you're expecting
to wait for longer periods of time before starting BOLD acquisition.
Keyword Arguments
timeout - Float or int that indicates the timeout in
seconds. If no button is pressed within the
timeout, this function will return. The
timeout can be None, meaning no timeout will
occur. Default = 10.0
Returns
time, triggered, timed_out
- time is a float value that reflects the time
in seconds at the time the button press was
detected.
triggered is a bool indicating whether the
scanner sent a pulse.
timed_out is a bool indicating whether a
timeout occured before receiving a pulse.
"""
# Get the starting time.
t0 = time.time()
# Create a new Task to listen in on the button channels.
with nidaqmx.Task() as task:
# Add the digital input (di) channels.
task.di_channels.add_di_chan("%s/%s" % \
(self._dev_name, self._scan_chan))
# Start the task (reduces timing inefficiency in read function).
task.start()
# Loop until a signal or a timeout happens.
triggered = False
timed_out = False
while not triggered and not timed_out:
# Get a single sample from the digital input channels.
state = task.read(number_of_samples_per_channel=1, timeout=1.0)
t1 = time.time()
# Check the sample (turns False when triggered).
if not state[0]:
triggered = True
break
# Check the time.
if timeout is not None:
if t1 - t0 > timeout:
timed_out = True
# Stop the task.
task.stop()
return t1, triggered, timed_out
def wait_for_button_press(self, allowed=None, timeout=None):
"""Waits for a button press.
Keyword Arguments
allowed - List of strings with allowed button names, or
None to allow all buttons. Default = None
timeout - Float or int that indicates the timeout in
seconds. If no button is pressed within the
timeout, this function will return. The
timeout can be None, meaning no timeout will
occur. Default = None
Returns
button, time - button is a string that indicates the pressed
button's name (only the first-pressed button
is counted), or None if no button was pressed
before a timeout occured.
time is a float value that reflects the time
in seconds at the time the button press was
detected.
"""
# Get the indices of the allowed buttons.
if allowed is not None:
allow = []
for b in allowed:
if b not in self._button_list:
raise Exception("ERROR: Unknown button '%s'; available buttons: %s" \
% (b, self._button_list))
allow.append(self._button_list.index(b))
else:
allow = range(len(self._button_list))
# Get the starting time.
t0 = time.time()
t1 = time.time()
# Create a new Task to listen in on the button channels. Using a with
# statement will automatically close the Task if an error happens
# during execution, leaving the NI box in a better state.
with nidaqmx.Task() as task:
# Add the digital input (di) channels.
task.di_channels.add_di_chan( \
"%s/%s" % (self._dev_name, self._button_channels), \
line_grouping=LineGrouping.CHAN_PER_LINE)
# Start the task (this will reduce timing inefficience when
# calling the task.read function).
task.start()
# Run until a timeout or a button press occurs.
button = None
pressed = False
while not pressed and t1 - t0 < timeout:
# Get a single sample from the digital input channels.
state = task.read(number_of_samples_per_channel=1, \
timeout=0.001)
# Get a timestamp for the sample.
t1 = time.time()
# Check whether any of the allowed buttons were pressed.
for i in allow:
if state[i][0]:
pressed = True
button = self._button_list[i]
break
# Stop the task.
task.stop()
return button, t1