-
Notifications
You must be signed in to change notification settings - Fork 83
/
meg_bids.py
483 lines (419 loc) · 17.8 KB
/
meg_bids.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# Authors: Mainak Jas <mainak.jas@telecom-paristech.fr>
# Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
# Teon Brooks <teon.brooks@gmail.com>
# Chris Holdgraf <choldgraf@berkeley.edu>
# Stefan Appelhoff <stefan.appelhoff@mailbox.org>
#
# License: BSD (3-clause)
import os
import shutil as sh
import pandas as pd
from collections import defaultdict, OrderedDict
import numpy as np
from mne import read_events, find_events
from mne.io.constants import FIFF
from mne.io.pick import channel_type
from mne.io import BaseRaw
from mne.channels.channels import _unit2human
from mne.externals.six import string_types
from datetime import datetime
from warnings import warn
from .utils import (make_bids_filename, make_bids_folders,
make_dataset_description, _write_json)
from .io import _parse_ext, _read_raw
ALLOWED_KINDS = ['meg', 'ieeg']
orientation = {'.sqd': 'ALS', '.con': 'ALS', '.fif': 'RAS', '.gz': 'RAS',
'.pdf': 'ALS', '.ds': 'ALS'}
units = {'.sqd': 'm', '.con': 'm', '.fif': 'm', '.gz': 'm', '.pdf': 'm',
'.ds': 'cm'}
manufacturers = {'.sqd': 'KIT/Yokogawa', '.con': 'KIT/Yokogawa',
'.fif': 'Elekta', '.gz': 'Elekta', '.pdf': '4D Magnes',
'.ds': 'CTF'}
def _channels_tsv(raw, fname, verbose):
"""Create a channels.tsv file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
fname : str
Filename to save the channels.tsv to.
verbose : bool
Set verbose output to true or false.
"""
map_chs = defaultdict(lambda: 'OTHER')
map_chs.update(grad='MEGGRAD', mag='MEGMAG', stim='TRIG', eeg='EEG',
ecog='ECOG', seeg='SEEG', eog='EOG', ecg='ECG', misc='MISC',
resp='RESPONSE', ref_meg='REFMEG')
map_desc = defaultdict(lambda: 'Other type of channel')
map_desc.update(grad='Gradiometer', mag='Magnetometer',
stim='Trigger',
eeg='ElectroEncephaloGram',
ecog='Electrocorticography',
seeg='StereoEEG',
ecg='ElectroCardioGram',
eog='ElectrOculoGram', misc='Miscellaneous',
ref_meg='Reference channel')
status, ch_type, description = list(), list(), list()
for idx, ch in enumerate(raw.info['ch_names']):
status.append('bad' if ch in raw.info['bads'] else 'good')
ch_type.append(map_chs[channel_type(raw.info, idx)])
description.append(map_desc[channel_type(raw.info, idx)])
low_cutoff, high_cutoff = (raw.info['highpass'], raw.info['lowpass'])
units = [_unit2human.get(ch_i['unit'], 'n/a') for ch_i in raw.info['chs']]
n_channels = raw.info['nchan']
sfreq = raw.info['sfreq']
df = pd.DataFrame(OrderedDict([
('name', raw.info['ch_names']),
('type', ch_type),
('units', units),
('description', description),
('sampling_frequency', ['%.2f' % sfreq] * n_channels),
('low_cutoff', ['%.2f' % low_cutoff] * n_channels),
('high_cutoff', ['%.2f' % high_cutoff] * n_channels),
('status', status)]))
df.to_csv(fname, sep='\t', index=False)
if verbose:
print(os.linesep + "Writing '%s'..." % fname + os.linesep)
print(df.head())
return fname
def _events_tsv(events, raw, fname, event_id, verbose):
"""Create an events.tsv file and save it.
Parameters
----------
events : array, shape = (n_events, 3)
The first column contains the event time in samples and the third
column contains the event id. The second column is ignored for now but
typically contains the value of the trigger channel either immediately
before the event or immediately after.
raw : instance of Raw
The data as MNE-Python Raw object.
fname : str
Filename to save the events.tsv to.
event_id : dict | None
Dictionary mapping a brief description key to an event id (value). For
example {'Go': 1, 'No Go': 2}.
verbose : bool
Set verbose output to true or false.
"""
first_samp = raw.first_samp
sfreq = raw.info['sfreq']
events[:, 0] -= first_samp
df = pd.DataFrame(np.c_[events[:, 0], np.zeros(events.shape[0]),
events[:, 2]],
columns=['onset', 'duration', 'condition'])
if event_id:
event_id_map = {v: k for k, v in event_id.items()}
df.condition = df.condition.map(event_id_map)
df.onset /= sfreq
df = df.fillna('n/a')
df.to_csv(fname, sep='\t', index=False)
if verbose:
print(os.linesep + "Writing '%s'..." % fname + os.linesep)
print(df.head())
return fname
def _scans_tsv(raw, raw_fname, fname, verbose):
"""Create a scans.tsv file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
raw_fname : str
Relative path to the raw data file.
fname : str
Filename to save the scans.tsv to.
verbose : bool
Set verbose output to true or false.
"""
# get MEASurement date from the data info
meas_date = raw.info['meas_date']
if isinstance(meas_date, (np.ndarray, list)):
meas_date = meas_date[0]
if meas_date is None:
acq_time = 'n/a'
else:
acq_time = datetime.fromtimestamp(
meas_date).strftime('%Y-%m-%dT%H:%M:%S')
df = pd.DataFrame({'filename': ['%s' % raw_fname],
'acq_time': [acq_time]})
df.to_csv(fname, sep='\t', index=False)
if verbose:
print(os.linesep + "Writing '%s'..." % fname + os.linesep)
print(df.head())
return fname
def _coordsystem_json(raw, unit, orient, manufacturer, fname, verbose):
"""Create a coordsystem.json file and save it.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
unit : str
Units to be used in the coordsystem specification.
orient : str
Used to define the coordinate system for the head coils.
manufacturer : str
Used to define the coordinate system for the MEG sensors.
fname : str
Filename to save the coordsystem.json to.
verbose : bool
Set verbose output to true or false.
"""
dig = raw.info['dig']
coords = dict()
fids = {d['ident']: d for d in dig if d['kind'] ==
FIFF.FIFFV_POINT_CARDINAL}
if fids:
if FIFF.FIFFV_POINT_NASION in fids:
coords['NAS'] = fids[FIFF.FIFFV_POINT_NASION]['r'].tolist()
if FIFF.FIFFV_POINT_LPA in fids:
coords['LPA'] = fids[FIFF.FIFFV_POINT_LPA]['r'].tolist()
if FIFF.FIFFV_POINT_RPA in fids:
coords['RPA'] = fids[FIFF.FIFFV_POINT_RPA]['r'].tolist()
hpi = {d['ident']: d for d in dig if d['kind'] == FIFF.FIFFV_POINT_HPI}
if hpi:
for ident in hpi.keys():
coords['coil%d' % ident] = hpi[ident]['r'].tolist()
coord_frame = set([dig[ii]['coord_frame'] for ii in range(len(dig))])
if len(coord_frame) > 1:
err = 'All HPI and Fiducials must be in the same coordinate frame.'
raise ValueError(err)
fid_json = {'MEGCoordinateSystem': manufacturer,
'MEGCoordinateUnits': unit, # XXX validate this
'HeadCoilCoordinates': coords,
'HeadCoilCoordinateSystem': orient,
'HeadCoilCoordinateUnits': unit # XXX validate this
}
_write_json(fid_json, fname)
return fname
def _sidecar_json(raw, task, manufacturer, fname, kind, verbose):
"""Create a sidecar json file depending on the kind and save it.
The sidecar json file provides meta data about the data of a certain kind.
Parameters
----------
raw : instance of Raw
The data as MNE-Python Raw object.
task : str
Name of the task the data is based on.
manufacturer : str
Used to define the coordinate system for the MEG sensors.
fname : str
Filename to save the sidecar json to.
kind : str
Type of the data as in ALLOWED_KINDS.
verbose : bool
Set verbose output to true or false.
"""
sfreq = raw.info['sfreq']
powerlinefrequency = raw.info.get('line_freq', None)
if powerlinefrequency is None:
warn('No line frequency found, defaulting to 50 Hz')
powerlinefrequency = 50
n_megchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_MEG_CH])
n_megrefchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_REF_MEG_CH])
n_eegchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EEG_CH])
n_ecogchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_ECOG_CH])
n_seegchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_SEEG_CH])
n_eogchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EOG_CH])
n_ecgchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_ECG_CH])
n_emgchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_EMG_CH])
n_miscchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_MISC_CH])
n_stimchan = len([ch for ch in raw.info['chs']
if ch['kind'] == FIFF.FIFFV_STIM_CH])
# Define modality-specific JSON dictionaries
ch_info_json_common = [
('TaskName', task),
('Manufacturer', manufacturer),
('PowerLineFrequency', powerlinefrequency)]
ch_info_json_meg = [
('SamplingFrequency', sfreq),
("DewarPosition", "XXX"),
("DigitizedLandmarks", False),
("DigitizedHeadPoints", False),
("SoftwareFilters", "n/a"),
('MEGChannelCount', n_megchan),
('MEGREFChannelCount', n_megrefchan)]
ch_info_json_ieeg = [
('ECOGChannelCount', n_ecogchan),
('SEEGChannelCount', n_seegchan)]
ch_info_ch_counts = [
('EEGChannelCount', n_eegchan),
('EOGChannelCount', n_eogchan),
('ECGChannelCount', n_ecgchan),
('EMGChannelCount', n_emgchan),
('MiscChannelCount', n_miscchan),
('TriggerChannelCount', n_stimchan)]
# Stitch together the complete JSON dictionary
ch_info_json = ch_info_json_common
append_kind_json = ch_info_json_meg if kind == 'meg' else ch_info_json_ieeg
ch_info_json += append_kind_json
ch_info_json += ch_info_ch_counts
ch_info_json = OrderedDict(ch_info_json)
_write_json(ch_info_json, fname, verbose=verbose)
return fname
def raw_to_bids(subject_id, task, raw_file, output_path, session_id=None,
run=None, kind='meg', events_data=None, event_id=None,
hpi=None, electrode=None, hsp=None, config=None,
overwrite=True, verbose=True):
"""Walk over a folder of files and create BIDS compatible folder.
Parameters
----------
subject_id : str
The subject name in BIDS compatible format ('01', '02', etc.)
task : str
Name of the task the data is based on.
raw_file : str | instance of mne.Raw
The raw data. If a string, it is assumed to be the path to the raw data
file. Otherwise it must be an instance of mne.Raw
output_path : str
The path of the BIDS compatible folder
session_id : str | None
The session name in BIDS compatible format.
run : int | None
The run number for this dataset.
kind : str, one of ('meg', 'ieeg')
The kind of data being converted. Defaults to "meg".
events_data : str | array | None
The events file. If a string, a path to the events file. If an array,
the MNE events array (shape n_events, 3). If None, events will be
inferred from the stim channel using `find_events`.
event_id : dict
The event id dict
hpi : None | str | list of str
Marker points representing the location of the marker coils with
respect to the MEG Sensors, or path to a marker file.
If list, all of the markers will be averaged together.
electrode : None | str
Digitizer points representing the location of the fiducials and the
marker coils with respect to the digitized head shape, or path to a
file containing these points.
hsp : None | str | array, shape = (n_points, 3)
Digitizer head shape points, or path to head shape file. If more than
10`000 points are in the head shape, they are automatically decimated.
config : str | None
A path to the configuration file to use if the data is from a BTi
system.
overwrite : bool
If the file already exists, whether to overwrite it.
verbose : bool
If verbose is True, this will print a snippet of the sidecar files. If
False, no content will be printed.
"""
if isinstance(raw_file, string_types):
# We must read in the raw data
raw = _read_raw(raw_file, electrode=electrode, hsp=hsp, hpi=hpi,
config=config, verbose=verbose)
_, ext = _parse_ext(raw_file, verbose=verbose)
elif isinstance(raw_file, BaseRaw):
# Only parse the filename for the extension
# Assume that if no filename attr exists, it's a fif file.
raw = raw_file
if hasattr(raw, 'filenames'):
_, ext = _parse_ext(raw.filenames[0], verbose=verbose)
else:
ext = '.fif'
else:
raise ValueError('raw_file must be an instance of str or BaseRaw, '
'got %s' % type(raw_file))
data_path = make_bids_folders(subject=subject_id, session=session_id,
kind=kind, root=output_path,
overwrite=overwrite,
verbose=verbose)
if session_id is None:
ses_path = data_path
else:
ses_path = make_bids_folders(subject=subject_id, session=session_id,
root=output_path,
overwrite=False,
verbose=verbose)
# create filenames
scans_fname = make_bids_filename(
subject=subject_id, session=session_id, suffix='scans.tsv',
prefix=ses_path)
coordsystem_fname = make_bids_filename(
subject=subject_id, session=session_id,
suffix='coordsystem.json', prefix=data_path)
data_meta_fname = make_bids_filename(
subject=subject_id, session=session_id,
suffix='%s.json' % kind, prefix=data_path)
raw_file_bids = make_bids_filename(
subject=subject_id, session=session_id, task=task, run=run,
suffix='%s%s' % (kind, ext), prefix=data_path)
events_tsv_fname = make_bids_filename(
subject=subject_id, session=session_id, task=task,
run=run, suffix='events.tsv', prefix=data_path)
channels_fname = make_bids_filename(
subject=subject_id, session=session_id, task=task, run=run,
suffix='channels.tsv', prefix=data_path)
# Read in Raw object and extract metadata from Raw object if needed
if kind == 'meg':
orient = orientation[ext]
unit = units[ext]
manufacturer = manufacturers[ext]
else:
orient = 'n/a'
unit = 'n/a'
manufacturer = 'n/a'
# save stuff
if kind == 'meg':
_scans_tsv(raw, raw_file_bids, scans_fname, verbose)
_coordsystem_json(raw, unit, orient, manufacturer, coordsystem_fname,
verbose)
make_dataset_description(output_path, name=" ",
verbose=verbose)
_sidecar_json(raw, task, manufacturer, data_meta_fname, kind, verbose)
_channels_tsv(raw, channels_fname, verbose)
events = _read_events(events_data, raw)
if len(events) > 0:
_events_tsv(events, raw, events_tsv_fname, event_id, verbose)
# for FIF, we need to re-save the file to fix the file pointer
# for files with multiple parts
if ext in ['.fif', '.gz']:
raw.save(raw_file_bids, overwrite=overwrite)
else:
if os.path.exists(raw_file_bids):
if overwrite:
os.remove(raw_file_bids)
sh.copyfile(raw_file, raw_file_bids)
else:
raise ValueError('"%s" already exists. Please set overwrite to'
' True.' % raw_file_bids)
return output_path
def _read_events(events_data, raw):
"""Read in events data.
Parameters
----------
events_data : str | array | None
The events file. If a string, a path to the events file. If an array,
the MNE events array (shape n_events, 3). If None, events will be
inferred from the stim channel using `find_events`.
raw : instance of Raw
The data as MNE-Python Raw object.
Returns
-------
events : array, shape = (n_events, 3)
The first column contains the event time in samples and the third
column contains the event id. The second column is ignored for now but
typically contains the value of the trigger channel either immediately
before the event or immediately after.
"""
if isinstance(events_data, string_types):
events = read_events(events_data).astype(int)
elif isinstance(events_data, np.ndarray):
if events_data.ndim != 2:
raise ValueError('Events must have two dimensions, '
'found %s' % events.ndim)
if events_data.shape[1] != 3:
raise ValueError('Events must have second dimension of length 3, '
'found %s' % events.shape[1])
events = events_data
else:
events = find_events(raw, min_duration=0.001)
return events