-
Notifications
You must be signed in to change notification settings - Fork 11
/
b0shim.py
1418 lines (1231 loc) · 75.9 KB
/
b0shim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
This file includes CLIs for shimming by fitting fieldmaps for static and realtime shimming. It groups them along with
the gradient method in a st_shim CLI with the argument being:
- fieldmap_static
- fieldmap_realtime
- gradient_realtime
"""
import click
import copy
import json
import nibabel as nib
import numpy as np
import logging
import os
from matplotlib.figure import Figure
from shimmingtoolbox import __dir_config_scanner_constraints__
from shimmingtoolbox.cli.realtime_shim import gradient_realtime
from shimmingtoolbox.coils.coil import Coil, ScannerCoil, get_scanner_constraints, restrict_sph_constraints
from shimmingtoolbox.pmu import PmuResp
from shimmingtoolbox.shim.sequencer import ShimSequencer, RealTimeSequencer
from shimmingtoolbox.shim.sequencer import shim_max_intensity, define_slices
from shimmingtoolbox.shim.sequencer import extend_fmap_to_kernel_size, parse_slices, new_bounds_from_currents
from shimmingtoolbox.utils import create_output_dir, set_all_loggers, timeit
from shimmingtoolbox.shim.shim_utils import phys_to_gradient_cs, shim_to_phys_cs
from shimmingtoolbox.shim.shim_utils import ScannerShimSettings
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
AVAILABLE_ORDERS = [-1, 0, 1, 2]
@click.group(context_settings=CONTEXT_SETTINGS,
help="Shim according to the specified algorithm as an argument e.g. st_b0shim xxxxx")
def b0shim_cli():
pass
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--coil', 'coils', nargs=2, multiple=True, type=(click.Path(exists=True), click.Path(exists=True)),
help="Pair of filenames containing the coil profiles followed by the filename to the constraints "
"e.g. --coil a.nii cons.json. If you have more than one coil, use this option more than once. "
"The coil profiles and the fieldmaps (--fmap) must have matching units (if fmap is in Hz, the coil "
"profiles must be in Hz/unit_shim). If using the scanner's gradient/shim coils, the coil profiles "
"must be in Hz/unit_shim and fieldmaps must be in Hz. If you want to shim using the scanner's "
"gradient/shim coils, use the `--scanner-coil-order` option. For an example of a constraint file, "
f"see: {__dir_config_scanner_constraints__}")
@click.option('--fmap', 'fname_fmap', required=True, type=click.Path(exists=True),
help="Static B0 fieldmap.")
@click.option('--anat', 'fname_anat', type=click.Path(exists=True), required=True,
help="Anatomical image to apply the correction onto.")
@click.option('--mask', 'fname_mask_anat', type=click.Path(exists=True), required=False,
help="Mask defining the spatial region to shim.")
@click.option('--scanner-coil-order', 'scanner_coil_order', type=click.STRING, default='-1', show_default=True,
help="Spherical harmonics orders to be used in optimization. "
f"Available orders: {AVAILABLE_ORDERS}. "
"Orders should be writen with a coma separating the values. (i.e. 0,1,2)"
"The 0th order is the f0 frequency.")
@click.option('--scanner-coil-constraints', 'fname_sph_constr', type=click.Path(), default="",
help=f"Constraints for the scanner coil. Example file located: {__dir_config_scanner_constraints__}")
@click.option('--slices', type=click.Choice(['interleaved', 'sequential', 'volume', 'auto']), required=False,
default='auto', show_default=True,
help="Define the slice ordering. If set to 'auto', automatically parse the target image.")
@click.option('--slice-factor', 'slice_factor', type=click.INT, required=False, default=1, show_default=True,
help="Number of slices per shimmed group. Used when '--slices' is not set to 'auto'. For example, if the "
"'--slice-factor' value is '3', then with the 'sequential' mode, shimming will be performed "
"independently on the following groups: {0,1,2}, {3,4,5}, etc. With the mode 'interleaved', "
"it will be: {0,2,4}, {1,3,5}, etc.")
@click.option('--optimizer-method', 'method', type=click.Choice(['least_squares', 'pseudo_inverse', 'quad_prog']),
required=False, default='quad_prog', show_default=True,
help="Method used by the optimizer. LS, and QP will respect the constraints,"
"PS will not respect the constraints")
@click.option('--regularization-factor', 'reg_factor', type=click.FLOAT, required=False, default=0.0, show_default=True,
help="Regularization factor for the current when optimizing. A higher coefficient will penalize higher "
"current values while 0 provides no regularization. Not relevant for 'pseudo-inverse' "
"optimizer_method.")
@click.option('--optimizer-criteria', 'opt_criteria', type=click.Choice(['mse', 'mae']), required=False,
default='mse', show_default=True,
help="Criteria of optimization for the optimizer 'least_squares'."
" mse: Mean Squared Error, mae: Mean Absolute Error")
@click.option('--mask-dilation-kernel-size', 'dilation_kernel_size', type=click.INT, required=False, default='3',
show_default=True,
help="Number of voxels to consider outside of the masked area. For example, when doing dynamic shimming "
"with a linear gradient, the coefficient corresponding to the gradient orthogonal to a single "
"slice cannot be estimated: there must be at least 2 (ideally 3) points to properly estimate the "
"linear term. When using 2nd order or more, more dilation is necessary.")
@click.option('--fatsat', type=click.Choice(['auto', 'yes', 'no']), default='auto', show_default=True,
help="Describe what to do with a fat saturation pulse. 'auto': It will parse the NIfTI file "
"for a fat-sat pulse and add shim coefficients of 0s before every shim group when using "
"'chronological-...' output-file-format-coil. 'no': It will not add 0s. 'yes': It will add 0s.")
@click.option('-o', '--output', 'path_output', type=click.Path(), default=os.path.abspath(os.curdir),
show_default=True, help="Directory to output coil text file(s).")
@click.option('--output-file-format-coil', 'o_format_coil',
type=click.Choice(['slicewise-ch', 'slicewise-coil', 'chronological-ch', 'chronological-coil']),
default='slicewise-coil',
show_default=True, help="Syntax used to describe the sequence of shim events for custom coils. "
"Use 'slicewise' to output in row 1, 2, 3, etc. the shim coefficients for slice "
"1, 2, 3, etc. Use 'chronological' to output in row 1, 2, 3, etc. the shim value "
"for trigger 1, 2, 3, etc. The trigger is an event sent by the scanner and "
"captured by the controller of the shim amplifier. Use 'ch' to output one "
"file per coil channel (coil1_ch1.txt, coil1_ch2.txt, etc.). Use 'coil' to "
"output one file per coil system (coil1.txt, coil2.txt). In the latter case, "
"all coil channels are encoded across multiple columns in the text file.")
@click.option('--output-file-format-scanner', 'o_format_sph',
type=click.Choice(['slicewise-ch', 'slicewise-coil', 'chronological-ch', 'chronological-coil',
'gradient']),
default='slicewise-coil',
show_default=True, help="Syntax used to describe the sequence of shim events for scanner coils. "
"Use 'slicewise' to output in row 1, 2, 3, etc. the shim coefficients for slice "
"1, 2, 3, etc. Use 'chronological' to output in row 1, 2, 3, etc. the shim value "
"for trigger 1, 2, 3, etc. The trigger is an event sent by the scanner and "
"captured by the controller of the shim amplifier. If there is a fat saturation "
"pulse in the anat sequence, shim weights of 0s are included in the output "
"text file before each slice coefficients. Use 'ch' to output one "
"file per coil channel (coil1_ch1.txt, coil1_ch2.txt, etc.). Use 'coil' to "
"output one file per coil system (coil1.txt, coil2.txt). In the latter case, "
"all coil channels are encoded across multiple columns in the text file. Use "
"'gradient' to output the 1st order in the Gradient CS, otherwise, it outputs in "
"the Shim CS.")
@click.option('--output-value-format', 'output_value_format', type=click.Choice(['delta', 'absolute']), default='delta',
show_default=True,
help="Coefficient values for the scanner coil. delta: Outputs the change of shim coefficients. "
"absolute: Outputs the absolute coefficient by taking into account the current shim settings. "
"This is effectively initial + shim. Scanner coil coefficients will be in the Shim coordinate "
"system unless the option --output-file-format is set to gradient. The delta value format should be "
"used in that case.")
@click.option('-v', '--verbose', type=click.Choice(['info', 'debug']), default='info', help="Be more verbose")
@timeit
def dynamic(fname_fmap, fname_anat, fname_mask_anat, method, opt_criteria, slices, slice_factor, coils,
dilation_kernel_size, scanner_coil_order, fname_sph_constr, fatsat, path_output, o_format_coil,
o_format_sph, output_value_format, reg_factor, verbose):
""" Static shim by fitting a fieldmap. Use the option --optimizer-method to change the shimming algorithm used to
optimize. Use the options --slices and --slice-factor to change the shimming order/size of the slices.
Example of use: st_b0shim dynamic --coil coil1.nii coil1_config.json --coil coil2.nii coil2_config.json
--fmap fmap.nii --anat anat.nii --mask mask.nii --optimizer-method least_squares
"""
scanner_coil_order = parse_orders(scanner_coil_order)
# Set logger level
set_all_loggers(verbose)
# Load the fieldmap
nii_fmap_orig = nib.load(fname_fmap)
# Make sure the fieldmap has the appropriate dimensions
if nii_fmap_orig.get_fdata().ndim != 3:
if nii_fmap_orig.get_fdata().ndim == 2:
nii_fmap = nib.Nifti1Image(nii_fmap_orig.get_fdata()[..., np.newaxis], nii_fmap_orig.affine,
header=nii_fmap_orig.header)
nii_fmap = extend_fmap_to_kernel_size(nii_fmap, dilation_kernel_size, path_output)
else:
raise ValueError("Fieldmap must be 2d or 3d")
else:
# Extend the fieldmap if there are axes that have less voxels than the kernel size. This is done since we are
# fitting a fieldmap to coil profiles and having a small number of voxels can lead to errors in fitting
# (2 voxels in one dimension can differentiate order 1 at most), the parameter allows to have at least the
# size of the kernel for each dimension This is usually useful in the through plane direction where we could
# have less slices. To mitigate this, we create a 3d volume by replicating the slices on the edges.
extending = False
for i_axis in range(3):
if nii_fmap_orig.shape[i_axis] < dilation_kernel_size:
extending = True
break
if extending:
nii_fmap = extend_fmap_to_kernel_size(nii_fmap_orig, dilation_kernel_size, path_output)
else:
nii_fmap = copy.deepcopy(nii_fmap_orig)
# Prepare the output
create_output_dir(path_output)
# Load the anat
nii_anat = nib.load(fname_anat)
dim_info = nii_anat.header.get_dim_info()
if dim_info[2] is None:
logger.warning("The slice encoding direction is not specified in the NIfTI header, Shimming Toolbox will "
"assume it is in the third dimension.")
else:
if dim_info[2] != 2:
# # Reorient nifti so that the slice is the last dim
# anat = nii_anat.get_fdata()
# # TODO: find index of dim_info
# index_in = 0
# index_out = 2
#
# # Swap axis in the array
# anat = np.swapaxes(anat, index_in, index_out)
#
# # Affine must change
# affine = copy.deepcopy(nii_anat.affine)
# affine[:, index_in] = nii_anat.affine[:, index_out]
# affine[:, index_out] = nii_anat.affine[:, index_in]
# affine[index_out, 3] = nii_anat.affine[index_in, 3]
# affine[index_in, 3] = nii_anat.affine[index_out, 3]
#
# nii_reorient = nib.Nifti1Image(anat, affine, header=nii_anat.header)
# nib.save(nii_reorient, os.path.join(path_output, 'anat_reorient.nii.gz'))
# Slice must be the 3rd dimension of the file
# TODO: Reorient nifti so that the slice is the 3rd dim
raise RuntimeError("Slice encode direction must be the 3rd dimension of the NIfTI file.")
# Load anat json
fname_anat_json = fname_anat.rsplit('.nii', 1)[0] + '.json'
with open(fname_anat_json) as json_file:
json_anat_data = json.load(json_file)
# Load mask
if fname_mask_anat is not None:
nii_mask_anat = nib.load(fname_mask_anat)
else:
# If no mask is provided, shim the whole anat volume
nii_mask_anat = nib.Nifti1Image(np.ones_like(nii_anat.get_fdata()), nii_anat.affine, header=nii_anat.header)
if logger.level <= getattr(logging, 'DEBUG'):
# Save inputs
list_fname = [fname_fmap, fname_anat, fname_mask_anat]
_save_nii_to_new_dir(list_fname, path_output)
# Open json of the fmap
fname_json = fname_fmap.split('.nii')[0] + '.json'
# Read from json file
if os.path.isfile(fname_json):
with open(fname_json) as json_file:
json_fm_data = json.load(json_file)
else:
raise OSError("Missing fieldmap json file")
# Error out for unsupported inputs. If file format is in gradient CS, it must be 1st order and the output format be
# delta. Only Siemens gradient coordinate system has been defined
if o_format_sph == 'gradient':
if output_value_format != 'delta':
raise ValueError(f"Unsupported output value format: {output_value_format} for output file format: "
f"{o_format_sph}")
if not (scanner_coil_order == [0, 1] or scanner_coil_order == [1]):
raise ValueError(f"Unsupported scanner coil order: {scanner_coil_order} for output file format: "
f"{o_format_sph}")
if json_fm_data.get('Manufacturer') != 'Siemens':
raise NotImplementedError(f"Unsupported manufacturer: {json_fm_data.get('Manufacturer')} for output file"
f"format: {o_format_sph}")
# Read the current shim settings from the scanner
scanner_shim_settings = ScannerShimSettings(json_fm_data)
options = {'scanner_shim': scanner_shim_settings.shim_settings}
# Load the coils
list_coils = _load_coils(coils, scanner_coil_order, fname_sph_constr, nii_fmap, options['scanner_shim'],
json_fm_data.get('Manufacturer'), json_fm_data.get('ManufacturersModelName'))
# Get the shim slice ordering
n_slices = nii_anat.shape[2]
if slices == 'auto':
list_slices = parse_slices(fname_anat)
else:
list_slices = define_slices(n_slices, slice_factor, slices)
logger.info(f"The slices to shim are:\n{list_slices}")
# Get shimming coefficients
# 1 ) Create the Shimming sequencer object
sequencer = ShimSequencer(nii_fmap_orig, nii_anat, nii_mask_anat, list_slices, list_coils,
method=method,
opt_criteria=opt_criteria,
mask_dilation_kernel='sphere',
mask_dilation_kernel_size=dilation_kernel_size,
reg_factor=reg_factor,
path_output=path_output)
# 2) Launch shim sequencer
coefs = sequencer.shim()
# Output
# Load output options
options['fatsat'] = _get_fatsat_option(json_anat_data, fatsat)
list_fname_output = []
end_channel = 0
for i_coil, coil in enumerate(list_coils):
# Figure out the start and end channels for a coil to be able to select it from the coefs
n_channels = coil.dim[3]
start_channel = end_channel
end_channel = start_channel + n_channels
# Select the coefficients for a coil
coefs_coil = copy.deepcopy(coefs[:, start_channel:end_channel])
# If it's a scanner
if type(coil) == ScannerCoil:
manufacturer = json_anat_data['Manufacturer']
# If outputting in the gradient CS, it must be the 1st order, it must be in the delta CS and Siemens
# The check has already been done earlier in the program to avoid processing and throw an error afterwards.
# Therefore, we can only check for the o_format_sph.
if o_format_sph == 'gradient':
logger.debug("Converting Siemens scanner coil from Shim CS (LAI) to Gradient CS")
# First convert to RAS
orders = tuple([order for order in scanner_coil_order if order != 0])
for i_shim in range(coefs.shape[0]):
# Convert coefficient
coefs_coil[i_shim, 1:] = shim_to_phys_cs(coefs_coil[i_shim, 1:], manufacturer, orders)
# Convert coef of 1st order sph harmonics to Gradient coord system
coefs_freq, coefs_phase, coefs_slice = phys_to_gradient_cs(coefs_coil[:, 1],
coefs_coil[:, 2],
coefs_coil[:, 3], fname_anat)
coefs_coil[:, 1] = coefs_freq
coefs_coil[:, 2] = coefs_phase
coefs_coil[:, 3] = coefs_slice
else:
# If the output format is absolute, add the initial coefs
if output_value_format == 'absolute':
initial_coefs = scanner_shim_settings.concatenate_shim_settings(scanner_coil_order)
for i_channel in range(n_channels):
# abs_coef = delta + initial
coefs_coil[:, i_channel] = coefs_coil[:, i_channel] + initial_coefs[i_channel]
list_fname_output += _save_to_text_file_static(coil, coefs_coil, list_slices, path_output,
o_format_sph, options, coil_number=i_coil,
default_coefs=initial_coefs)
continue
list_fname_output += _save_to_text_file_static(coil, coefs_coil, list_slices, path_output, o_format_sph,
options, coil_number=i_coil)
else:
list_fname_output += _save_to_text_file_static(coil, coefs_coil, list_slices, path_output, o_format_coil,
options, coil_number=i_coil)
logger.info(f"Coil txt file(s) are here:\n{os.linesep.join(list_fname_output)}")
logger.info(f"Plotting figure(s)")
sequencer.eval(coefs)
logger.info(f" Plotting currents")
# Plot the coefs after outputting the currents to the text file
end_channel = 0
for i_coil, coil in enumerate(list_coils):
# Figure out the start and end channels for a coil to be able to select it from the coefs
n_channels = coil.dim[3]
start_channel = end_channel
end_channel = start_channel + n_channels
if type(coil) != ScannerCoil:
# Select the coefficients for a coil
coefs_coil = copy.deepcopy(coefs[:, start_channel:end_channel])
# Plot a figure of the coefficients
_plot_coefs(coil, list_slices, coefs_coil, path_output, i_coil,
bounds=[bound for bounds in coil.coef_channel_minmax.values() for bound in bounds])
logger.info(f"Finished plotting figure(s)")
def _save_to_text_file_static(coil, coefs, list_slices, path_output, o_format, options, coil_number,
default_coefs=None):
"""o_format can either be 'slicewise-ch', 'slicewise-coil', 'chronological-ch', 'chronological-coil', 'gradient'"""
n_channels = coil.dim[3]
list_fname_output = []
if o_format[-5:] == '-coil':
fname_output = os.path.join(path_output, f"coefs_coil{coil_number}_{coil.name}.txt")
with open(fname_output, 'w', encoding='utf-8') as f:
# (len(slices) x n_channels)
if o_format == 'chronological-coil':
# Output per shim (chronological), output all channels for a particular shim, then repeat
for i_shim in range(len(list_slices)):
# If fatsat pulse, set shim coefs to 0
if options['fatsat']:
for i_channel in range(n_channels):
if default_coefs is None:
# Output 0 (delta)
f.write(f"{0:.1f}, ")
else:
# Output initial coefs (absolute)
f.write(f"{default_coefs[i_channel]:.6f}, ")
f.write(f"\n")
for i_channel in range(n_channels):
f.write(f"{coefs[i_shim, i_channel]:.6f}, ")
f.write("\n")
elif o_format == 'slicewise-coil':
# Output per slice, output all channels for a particular slice, then repeat
# Assumes all slices are in list_slices once which is the case for sequential, interleaved and
# volume
n_slices = np.sum([len(a_shim) for a_shim in list_slices])
for i_slice in range(n_slices):
i_shim = [list_slices.index(a_shim) for a_shim in list_slices if i_slice in a_shim][0]
for i_channel in range(n_channels):
f.write(f"{coefs[i_shim, i_channel]:.6f}, ")
f.write("\n")
list_fname_output.append(os.path.abspath(fname_output))
elif o_format[-3:] == '-ch':
# Write a file for each channel
for i_channel in range(n_channels):
fname_output = os.path.abspath(os.path.join(path_output,
f"coefs_coil{coil_number}_ch{i_channel}_{coil.name}.txt"))
if o_format == 'chronological-ch':
with open(fname_output, 'w', encoding='utf-8') as f:
# Each row will have one coef representing the shim in chronological order
for i_shim in range(len(list_slices)):
# If fatsat pulse, set shim coefs to 0
if options['fatsat']:
if default_coefs is None:
# Output 0 (delta)
f.write(f"{0:.1f},\n")
else:
# Output initial coefs (absolute)
f.write(f"{default_coefs[i_channel]:.6f},\n")
f.write(f"{coefs[i_shim, i_channel]:.6f},\n")
if o_format == 'slicewise-ch':
with open(fname_output, 'w', encoding='utf-8') as f:
# Each row will have one coef representing the shim in slicewise order
n_slices = np.sum([len(a_tuple) for a_tuple in list_slices])
for i_slice in range(n_slices):
i_shim = [list_slices.index(i) for i in list_slices if i_slice in i][0]
f.write(f"{coefs[i_shim, i_channel]:.6f}\n")
list_fname_output.append(os.path.abspath(fname_output))
else: # o_format == 'gradient':
for i_channel in range(n_channels):
# Make sure there are 4 channels
if n_channels != 4:
raise RuntimeError("Gradient output format should only be used with 1st order scanner coils")
name = {0: 'f0',
1: 'x',
2: 'y',
3: 'z'}
fname_output = os.path.join(path_output, f"{name[i_channel]}shim_gradients.txt")
with open(fname_output, 'w', encoding='utf-8') as f:
n_slices = np.sum([len(a_tuple) for a_tuple in list_slices])
for i_slice in range(n_slices):
i_shim = [list_slices.index(i) for i in list_slices if i_slice in i][0]
if i_channel == 0:
# f0, Output is in Hz
f.write(f"corr_vec[0][{i_slice}]= "
f"{coefs[i_shim, i_channel]:.6f}\n")
else:
# For Gx, Gy, Gz: Divide by 1000 for mT/m
f.write(f"corr_vec[0][{i_slice}]= "
f"{coefs[i_shim, i_channel] / 1000:.6f}\n")
# Static shimming does not have a a riro component
f.write(f"corr_vec[1][{i_slice}]= "
f"{0:.12f}\n")
# Arbitrarily chose a mean pressure of 2000 to satisfy the sequence
f.write(f"corr_vec[2][{i_slice}]= {2000:.3f}\n")
list_fname_output.append(os.path.abspath(fname_output))
return list_fname_output
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option('--coil', 'coils_static', nargs=2, multiple=True, type=(click.Path(exists=True), click.Path(exists=True)),
help="Pair of filenames containing the coil profiles followed by the filename to the constraints "
"e.g. --coil a.nii cons.json. If you have more than one coil, use this option more than once. "
"The coil profiles and the fieldmaps (--fmap) must have matching units (if fmap is in Hz, the coil "
"profiles must be in Hz/unit_shim). If you only want to shim using the scanner's gradient/shim "
"coils, use the `--scanner-coil-order` option. For an example of a constraint file, "
f"see: {__dir_config_scanner_constraints__}")
@click.option('--coil-riro', 'coils_riro', nargs=2, multiple=True,
type=(click.Path(exists=True), click.Path(exists=True)), required=False,
help="Pair of filenames containing the coil profiles followed by the filename to the constraints "
"e.g. --coil a.nii cons.json. If you have more than one coil, use this option more than once. "
"The coil profiles and the fieldmaps (--fmap) must have matching units (if fmap is in Hz, the coil "
"profiles must be in Hz/unit_shim). If this option is used, these coil profiles will be used for "
"the RIRO optimization, otherwise, the coils from the --coil options will be used."
"If you only want to shim using the scanner's gradient/shim "
"coils, use the `--scanner-coil-order` option. For an example of a constraint file, "
f"see: {__dir_config_scanner_constraints__}")
@click.option('--fmap', 'fname_fmap', required=True, type=click.Path(exists=True),
help="Timeseries of B0 fieldmap.")
@click.option('--anat', 'fname_anat', type=click.Path(exists=True), required=True,
help="Anatomical image to apply the correction onto.")
@click.option('--resp', 'fname_resp', type=click.Path(exists=True), required=True,
help="Siemens respiratory file containing pressure data.")
@click.option('--mask-static', 'fname_mask_anat_static', type=click.Path(exists=True), required=False,
help="Mask defining the static spatial region to shim.")
@click.option('--mask-riro', 'fname_mask_anat_riro', type=click.Path(exists=True), required=False,
help="Mask defining the time varying (i.e. RIRO, Respiration-Induced Resonance Offset) "
"region to shim.")
@click.option('--scanner-coil-order', 'scanner_coil_order_static', type=click.STRING, default='-1', show_default=True,
help="Spherical harmonics orders to be used in static optimization. "
f"Available orders: {AVAILABLE_ORDERS}. "
"Orders should be writen with a coma separating the values. (i.e. 0,1,2)"
"The 0th order is the f0 frequency.")
@click.option('--scanner-coil-order-riro', 'scanner_coil_order_riro', type=click.STRING, default=None,
show_default=True,
help="Spherical harmonics orders to be used in RIRO optimization. If not set, the same orders as "
"--scanner-coil-order will be used for RIRO"
f"Available orders: {AVAILABLE_ORDERS}. "
"Orders should be writen with a coma separating the values. (i.e. 0,1,2)"
"The 0th order is the f0 frequency.")
@click.option('--scanner-coil-constraints', 'fname_sph_constr', type=click.Path(), default="",
help=f"Constraints for the scanner coil. Example file located: {__dir_config_scanner_constraints__}")
@click.option('--slices', type=click.Choice(['interleaved', 'sequential', 'volume', 'auto']), required=False,
default='auto', show_default=True,
help="Define the slice ordering. If set to 'auto', automatically parse the target image.")
@click.option('--slice-factor', 'slice_factor', type=click.INT, required=False, default=1, show_default=True,
help="Number of slices per shimmed group. Used when '--slices' is not set to 'auto'. For example, if the "
"'--slice-factor' value is '3', then with the 'sequential' mode, shimming will be performed "
"independently on the following groups: {0,1,2}, {3,4,5}, etc. With the mode 'interleaved', "
"it will be: {0,2,4}, {1,3,5}, etc.")
@click.option('--optimizer-method', 'method', type=click.Choice(['least_squares', 'pseudo_inverse',
'quad_prog']), required=False,
default='quad_prog', show_default=True,
help="Method used by the optimizer. LS and QP will respect the constraints,"
"PS will not respect the constraints")
@click.option('--optimizer-criteria', 'opt_criteria', type=click.Choice(['mse', 'mae']), required=False,
default='mse', show_default=True,
help="Criteria of optimization for the optimizer 'least_squares'."
" mse: Mean Squared Error, mae: Mean Absolute Error")
@click.option('--regularization-factor', 'reg_factor', type=click.FLOAT, required=False, default=0.0, show_default=True,
help="Regularization factor for the current when optimizing. A higher coefficient will penalize higher "
"current values while 0 provides no regularization. Not relevant for 'pseudo-inverse' "
"optimizer_method.")
@click.option('--mask-dilation-kernel-size', 'dilation_kernel_size', type=click.INT, required=False, default='3',
show_default=True,
help="Number of voxels to consider outside of the masked area. For example, when doing dynamic shimming "
"with a linear gradient, the coefficient corresponding to the gradient orthogonal to a single "
"slice cannot be estimated: there must be at least 2 (ideally 3) points to properly estimate the "
"linear term. When using 2nd order or more, more dilation is necessary.")
@click.option('--fatsat', type=click.Choice(['auto', 'yes', 'no']), default='auto', show_default=True,
help="Describe what to do with a fat saturation pulse. 'auto': It will parse the NIfTI file "
"for a fat-sat pulse and add shim coefficients of 0s before every shim group when using "
"'chronological-...' output-file-format-coil. 'no': It will not add 0s. 'yes': It will add 0s.")
@click.option('-o', '--output', 'path_output', type=click.Path(), default=os.path.abspath(os.curdir),
show_default=True, help="Directory to output coil text file(s).")
@click.option('--output-file-format-coil', 'o_format_coil',
type=click.Choice(['slicewise-ch', 'chronological-ch']), default='slicewise-ch', show_default=True,
help="Syntax used to describe the sequence of shim events. "
"Use 'slicewise' to output in row 1, 2, 3, etc. the shim coefficients for slice "
"1, 2, 3, etc. Use 'chronological' to output in row 1, 2, 3, etc. the shim value "
"for trigger 1, 2, 3, etc. The trigger is an event sent by the scanner and "
"captured by the controller of the shim amplifier. For both 'slicewice' and 'chronological', "
"there will be one output file per coil channel (coil1_ch1.txt, coil1_ch2.txt, etc.). The static, "
"time-varying and mean pressure are encoded in the columns of each file.")
@click.option('--output-file-format-scanner', 'o_format_sph',
type=click.Choice(['slicewise-ch', 'chronological-ch', 'gradient']), default='slicewise-ch',
show_default=True,
help="Syntax used to describe the sequence of shim events. "
"Use 'slicewise' to output in row 1, 2, 3, etc. the shim coefficients for slice "
"1, 2, 3, etc. Use 'chronological' to output in row 1, 2, 3, etc. the shim value "
"for trigger 1, 2, 3, etc. The trigger is an event sent by the scanner and "
"captured by the controller of the shim amplifier. In both cases, there will be one output "
"file per coil channel (coil1_ch1.txt, coil1_ch2.txt, etc.). The static, "
"time-varying and mean pressure are encoded in the columns of each file. Use "
"'gradient' to output the scanner 1st order in the Gradient CS, otherwise, it outputs "
"in the Shim CS.")
@click.option('--output-value-format', 'output_value_format', type=click.Choice(['delta', 'absolute']),
default='delta', show_default=True,
help="Coefficient values for the scanner coil. delta: Outputs the change of shim coefficients. "
"absolute: Outputs the absolute coefficient by taking into account the current shim settings. "
"This is effectively initial + shim. Scanner coil coefficients will be in the Shim coordinate "
"system unless the option --output-file-format is set to gradient. The delta value format should be "
"used in that case.")
@click.option('-v', '--verbose', type=click.Choice(['info', 'debug']), default='info', help="Be more verbose")
@timeit
def realtime_dynamic(fname_fmap, fname_anat, fname_mask_anat_static, fname_mask_anat_riro, fname_resp, method,
opt_criteria, slices, slice_factor, coils_static, coils_riro, dilation_kernel_size,
scanner_coil_order_static, scanner_coil_order_riro, fname_sph_constr, fatsat, path_output,
o_format_coil, o_format_sph, output_value_format, reg_factor, verbose):
""" Realtime shim by fitting a fieldmap to a pressure monitoring unit. Use the option --optimizer-method to change
the shimming algorithm used to optimize. Use the options --slices and --slice-factor to change the shimming
order/size of the slices.
Example of use: st_b0shim realtime-dynamic --coil coil1.nii coil1_config.json --coil coil2.nii coil2_config.json
--fmap fmap.nii --anat anat.nii --mask-static mask.nii --resp trace.resp --optimizer-method least_squares
"""
# Set coils and scanner order for riro if none were indicated
if scanner_coil_order_riro is None:
scanner_coil_order_riro = scanner_coil_order_static
scanner_coil_order_static = parse_orders(scanner_coil_order_static)
scanner_coil_order_riro = parse_orders(scanner_coil_order_riro)
# Set logger level
set_all_loggers(verbose)
# Prepare the output
create_output_dir(path_output)
# Load the fieldmap
nii_fmap_orig = nib.load(fname_fmap)
# Make sure the fieldmap has the appropriate dimensions
if nii_fmap_orig.get_fdata().ndim != 4:
raise ValueError("Fieldmap must be 4d (dim1, dim2, dim3, t)")
# Extend the fieldmap if there are axes that have less voxels than the kernel size. This is done since we are
# fitting a fieldmap to coil profiles and having a small number of voxels can lead to errors in fitting (2 voxels
# in one dimension can differentiate order 1 at most), the parameter allows to have at least the size of the kernel
# for each dimension This is usually useful in the through plane direction where we could have less slices.
# To mitigate this, we create a 3d volume by replicating the slices on the edges.
extending = False
for i_axis in range(3):
if nii_fmap_orig.shape[i_axis] < dilation_kernel_size:
extending = True
break
if extending:
nii_fmap = extend_fmap_to_kernel_size(nii_fmap_orig, dilation_kernel_size, path_output)
else:
nii_fmap = copy.deepcopy(nii_fmap_orig)
# Load the anat
nii_anat = nib.load(fname_anat)
dim_info = nii_anat.header.get_dim_info()
if dim_info[2] != 2:
# Slice must be the 3rd dimension of the file
# TODO: Reorient nifti so that the slice is the 3rd dim
raise RuntimeError("Slice encode direction must be the 3rd dimension of the NIfTI file.")
# Load anat json
fname_anat_json = fname_anat.rsplit('.nii', 1)[0] + '.json'
with open(fname_anat_json) as json_file:
json_anat_data = json.load(json_file)
# Load static mask
if fname_mask_anat_static is not None:
nii_mask_anat_static = nib.load(fname_mask_anat_static)
else:
# If no mask is provided, shim the whole anat volume
nii_mask_anat_static = nib.Nifti1Image(np.ones_like(nii_anat.get_fdata()), nii_anat.affine,
header=nii_anat.header)
# Load riro mask
if fname_mask_anat_riro is not None:
nii_mask_anat_riro = nib.load(fname_mask_anat_riro)
else:
# If no mask is provided, shim the whole anat volume
nii_mask_anat_riro = nib.Nifti1Image(np.ones_like(nii_anat.get_fdata()), nii_anat.affine,
header=nii_anat.header)
# Open json of the fmap
fname_json = fname_fmap.split('.nii')[0] + '.json'
# Read from json file
if os.path.isfile(fname_json):
with open(fname_json) as json_file:
json_fm_data = json.load(json_file)
else:
raise OSError("Missing fieldmap json file")
# Error out for unsupported inputs. If file format is in gradient CS, it must be 1st order and the output format be
# delta.
if o_format_sph == 'gradient':
if output_value_format == 'absolute':
raise ValueError(f"Unsupported output value format: {output_value_format} for output file format: "
f"{o_format_sph}")
if not (scanner_coil_order_static == [0, 1] or scanner_coil_order_static == [1]) or \
not (scanner_coil_order_riro == [0, 1] or scanner_coil_order_riro == [1]):
raise ValueError(f"Unsupported scanner coil order: {scanner_coil_order_static} for output file format: "
f"{o_format_sph}")
if json_fm_data['Manufacturer'] != 'Siemens':
raise ValueError(f"Unsupported manufacturer: {json_fm_data['manufacturer']} for output file format: "
f"{o_format_sph}")
# Read the current shim settings from the scanner
scanner_shim_settings = ScannerShimSettings(json_fm_data)
options = {'scanner_shim': scanner_shim_settings.shim_settings}
# Load the coils
list_coils_static = _load_coils(coils_static, scanner_coil_order_static, fname_sph_constr, nii_fmap,
options['scanner_shim'], json_fm_data['Manufacturer'],
json_fm_data['ManufacturersModelName'])
list_coils_riro = _load_coils(coils_riro, scanner_coil_order_riro, fname_sph_constr, nii_fmap,
options['scanner_shim'], json_fm_data['Manufacturer'],
json_fm_data['ManufacturersModelName'])
if logger.level <= getattr(logging, 'DEBUG'):
# Save inputs
list_fname = [fname_fmap, fname_anat, fname_mask_anat_static, fname_mask_anat_riro]
_save_nii_to_new_dir(list_fname, path_output)
# Get the shim slice ordering
n_slices = nii_anat.shape[2]
if slices == 'auto':
list_slices = parse_slices(fname_anat)
else:
list_slices = define_slices(n_slices, slice_factor, slices)
logger.info(f"The slices to shim are: {list_slices}")
# Load PMU
pmu = PmuResp(fname_resp)
# 1 ) Create the real time pmu sequencer object
sequencer = RealTimeSequencer(nii_fmap_orig, json_fm_data, nii_anat, nii_mask_anat_static,
nii_mask_anat_riro,
list_slices, pmu, list_coils_static, list_coils_riro,
method=method,
opt_criteria=opt_criteria,
mask_dilation_kernel='sphere',
mask_dilation_kernel_size=dilation_kernel_size,
reg_factor=reg_factor,
path_output=path_output)
# 2) Launch the sequencer
out = sequencer.shim()
coefs_static, coefs_riro, mean_p, p_rms = out
# Output
# Load output options
options['fatsat'] = _get_fatsat_option(json_anat_data, fatsat)
# Get common coils between static and riro // Comparison based on coil name
coil_static_only = [coil for coil in list_coils_static if coil not in list_coils_riro]
coil_riro_only = [coil for coil in list_coils_riro if coil not in list_coils_static]
list_coils_common = [coil for coil in list_coils_static if coil in list_coils_riro]
# Create a list of all coils used in optimization
all_coils = list_coils_common + coil_static_only + coil_riro_only
index = 0
coil_indexes_static = {}
for coil in list_coils_static:
if type(coil) == Coil:
coil_indexes_static[coil.name] = [index, index + len(coil.coef_channel_minmax['coil'])]
index += len(coil.coef_channel_minmax['coil'])
else:
coil_indexes_static[coil.name] = {}
for key in coil.coef_channel_minmax:
coil_indexes_static[coil.name][key] = [index, index + len(coil.coef_channel_minmax[key])]
index += len(coil.coef_channel_minmax[key])
index = 0
coil_indexes_riro = {}
for coil in list_coils_riro:
if type(coil) == Coil:
coil_indexes_riro[coil.name] = [index, index + len(coil.coef_channel_minmax['coil'])]
index += len(coil.coef_channel_minmax['coil'])
else:
coil_indexes_riro[coil.name] = {}
for key in coil.coef_channel_minmax:
coil_indexes_riro[coil.name][key] = [index, index + len(coil.coef_channel_minmax[key])]
index += len(coil.coef_channel_minmax[key])
list_fname_output = []
for i_coil, coil in enumerate(all_coils):
# Figure out the start and end channels for a coil to be able to select it from the coefs
# If it's a scanner
if type(coil) == ScannerCoil:
if coil in list_coils_common:
keys = [str(order) for order in AVAILABLE_ORDERS
if (order != -1 and (str(order) in coil_indexes_riro[coil.name]
or str(order) in coil_indexes_static[coil.name]))]
elif coil in coil_static_only:
keys = [str(order) for order in AVAILABLE_ORDERS
if (order != -1 and str(order) in coil_indexes_static[coil.name])]
elif coil in coil_riro_only:
keys = [str(order) for order in AVAILABLE_ORDERS
if (order != -1 and str(order) in coil_indexes_riro[coil.name])]
for key in keys:
if coil in list_coils_riro:
if key in coil_indexes_riro[coil.name]:
coefs_coil_riro = copy.deepcopy(
coefs_riro[:, coil_indexes_riro[coil.name][key][0]:coil_indexes_riro[coil.name][key][1]])
else:
coefs_coil_riro = np.zeros_like(coefs_static[:, coil_indexes_static[coil.name][key][0]:
coil_indexes_static[coil.name][key][1]])
else:
coefs_coil_riro = np.zeros_like(
coefs_static[:, coil_indexes_static[coil.name][key][0]:coil_indexes_static[coil.name][key][1]])
if coil in list_coils_static:
if key in coil_indexes_static[coil.name]:
coefs_coil_static = copy.deepcopy(coefs_static[:, coil_indexes_static[coil.name][key][0]:
coil_indexes_static[coil.name][key][1]])
else:
coefs_coil_static = np.zeros_like(coefs_coil_riro)
else:
coefs_coil_static = np.zeros_like(coefs_coil_riro)
manufacturer = json_anat_data['Manufacturer']
# If outputting in the gradient CS, it must be the 1st order and must be in the delta CS and Siemens
# The check has already been done earlier in the program to avoid processing and throw an error
# afterwards.
# Therefore, we can only check for the o_format_sph.
if o_format_sph == 'gradient':
if key == '0':
save_coefs_static = coefs_coil_static
if coefs_coil_riro is not None:
save_coefs_riro = coefs_coil_riro
else:
save_coefs_riro = None
has0 = True
continue
elif key == '1' and has0:
save_coefs_static = np.concatenate((save_coefs_static, coefs_coil_static), axis=1)
if save_coefs_riro is not None:
save_coefs_riro = np.concatenate((save_coefs_riro, coefs_coil_riro), axis=1)
elif coefs_coil_riro is not None:
save_coefs_riro = coefs_coil_riro
else:
raise ValueError("Orders do not match gradient")
coefs_coil_static = save_coefs_static
coefs_coil_riro = save_coefs_riro
logger.debug("Converting scanner coil from Shim CS to Gradient CS")
orders_static = tuple([order for order in scanner_coil_order_static if order != 0])
orders_riro = tuple([order for order in scanner_coil_order_riro if order != 0])
# First convert coefficients from Shim CS to RAS
for i_shim in range(coefs_coil_static.shape[0]):
# Convert coefficient
coefs_coil_static[i_shim, 1:] = shim_to_phys_cs(coefs_coil_static[i_shim, 1:], manufacturer,
orders_static)
coefs_coil_riro[i_shim, 1:] = shim_to_phys_cs(coefs_coil_riro[i_shim, 1:], manufacturer,
orders_riro)
# RAS to gradient
coefs_st_freq, coefs_st_phase, coefs_st_slice = phys_to_gradient_cs(
coefs_coil_static[:, 1],
coefs_coil_static[:, 2],
coefs_coil_static[:, 3],
fname_anat)
coefs_coil_static[:, 1] = coefs_st_freq
coefs_coil_static[:, 2] = coefs_st_phase
coefs_coil_static[:, 3] = coefs_st_slice
coefs_riro_freq, coefs_riro_phase, coefs_riro_slice = phys_to_gradient_cs(
coefs_coil_riro[:, 1],
coefs_coil_riro[:, 2],
coefs_coil_riro[:, 3],
fname_anat)
coefs_coil_riro[:, 1] = coefs_riro_freq
coefs_coil_riro[:, 2] = coefs_riro_phase
coefs_coil_riro[:, 3] = coefs_riro_slice
else:
# If the output format is absolute, add the initial coefs
if output_value_format == 'absolute' and coefs_coil_static is not None:
initial_coefs = scanner_shim_settings.concatenate_shim_settings(scanner_coil_order_static)
for i_channel in range(coefs_coil_static.shape[-1]):
# abs_coef = delta + initial
coefs_coil_static[:, i_channel] = coefs_coil_static[:, i_channel] + initial_coefs[i_channel]
# riro does not change
list_fname_output += _save_to_text_file_rt(coil, coefs_coil_static, coefs_coil_riro, mean_p,
list_slices, path_output, o_format_sph, options,
i_coil, int(key) ** 2,
default_st_coefs=initial_coefs)
continue
list_fname_output += _save_to_text_file_rt(coil, coefs_coil_static, coefs_coil_riro, mean_p,
list_slices,
path_output, o_format_sph, options, i_coil, int(key) ** 2)
else: # Custom coil
if coil in list_coils_riro:
coefs_coil_riro = copy.deepcopy(
coefs_riro[:, coil_indexes_riro[coil.name][0]:coil_indexes_riro[coil.name][1]])
else:
coefs_coil_riro = np.zeros_like(
coefs_static[:, coil_indexes_static[coil.name][0]:coil_indexes_static[coil.name][1]])
if coil in list_coils_static:
coefs_coil_static = copy.deepcopy(
coefs_static[:, coil_indexes_static[coil.name][0]:coil_indexes_static[coil.name][1]])
else:
coefs_coil_static = np.zeros_like(coefs_coil_riro)
list_fname_output += _save_to_text_file_rt(coil, coefs_coil_static, coefs_coil_riro, mean_p, list_slices,
path_output, o_format_coil, options, i_coil, 0)
logger.info(f"Coil txt file(s) are here:\n{os.linesep.join(list_fname_output)}")
logger.info(f"Plotting figure(s)")
sequencer.eval(coefs_static, coefs_riro, mean_p, p_rms)
logger.info(f"Plotting Currents")
# Plot the coefs after outputting the currents to the text file
end_channel = 0
for i_coil, coil in enumerate(all_coils):
# Figure out the start and end channels for a coil to be able to select it from the coefs
if type(coil) != ScannerCoil:
if coil in list_coils_riro:
coefs_coil_riro = copy.deepcopy(
coefs_riro[:, coil_indexes_riro[coil.name][0]:coil_indexes_riro[coil.name][1]])
else:
coefs_coil_riro = None
if coil in list_coils_static:
coefs_coil_static = copy.deepcopy(
coefs_static[:, coil_indexes_static[coil.name][0]:coil_indexes_static[coil.name][1]])
else:
coefs_coil_static = np.zeros_like(coefs_coil_riro)
# Plot a figure of the coefficients
_plot_coefs(coil, list_slices, coefs_coil_static, path_output, i_coil, coefs_coil_riro,
pres_probe_max=pmu.max - mean_p, pres_probe_min=pmu.min - mean_p,
bounds=[bound for bounds in coil.coef_channel_minmax.values() for bound in bounds])
logger.info(f"Finished plotting figure(s)")
def _save_to_text_file_rt(coil, currents_static, currents_riro, mean_p, list_slices, path_output, o_format,
options, coil_number, channel_start, default_st_coefs=None):
"""o_format can either be 'chronological-ch', 'chronological-coil', 'gradient'"""
list_fname_output = []
if currents_riro is not None:
n_channels = currents_riro.shape[-1]
else:
n_channels = currents_static.shape[-1]
# Write a file for each channel
for i_channel in range(n_channels):
if o_format == 'chronological-ch':
fname_output = os.path.join(path_output,
f"coefs_coil{coil_number}_ch{channel_start + i_channel}_{coil.name}.txt")
with open(fname_output, 'w', encoding='utf-8') as f:
# Each row will have 3 coef representing the static, riro and mean_p in chronological order
for i_shim in range(len(list_slices)):
# If fatsat pulse, set shim coefs to 0 and output mean pressure
if options['fatsat']:
if default_st_coefs is None:
# Output 0 (delta)
f.write(f"{0:.1f}, {0:.1f}, {mean_p:.4f},\n")
else:
# Output initial coefs (absolute)
f.write(f"{default_st_coefs[i_channel]:.1f}, {0:.1f}, {mean_p:.4f},\n")
if currents_static is not None:
f.write(f"{currents_static[i_shim, i_channel]:.6f}, ")
if currents_riro is not None:
f.write(f"{currents_riro[i_shim, i_channel]:.12f}, ")
f.write(f"{mean_p:.4f},\n")
elif o_format == 'slicewise-ch':
fname_output = os.path.join(path_output,
f"coefs_coil{coil_number}_ch{channel_start + i_channel}_{coil.name}.txt")
with open(fname_output, 'w', encoding='utf-8') as f:
# Each row will have one coef representing the static, riro and mean_p in slicewise order
n_slices = np.sum([len(a_tuple) for a_tuple in list_slices])
for i_slice in range(n_slices):
i_shim = [list_slices.index(i) for i in list_slices if i_slice in i][0]
if currents_static is not None:
f.write(f"{currents_static[i_shim, i_channel]:.6f}, ")
if currents_riro is not None:
f.write(f"{currents_riro[i_shim, i_channel]:.12f}, ")
f.write(f"{mean_p:.4f},\n")
else: # o_format == 'gradient':
# Make sure there are 4 channels
if n_channels != 4:
raise RuntimeError("Gradient output format should only be used with 1st order scanner coils")
name = {0: 'f0',
1: 'x',
2: 'y',
3: 'z'}
fname_output = os.path.join(path_output, f"{name[i_channel]}shim_gradients.txt")
with open(fname_output, 'w', encoding='utf-8') as f:
n_slices = np.sum([len(a_tuple) for a_tuple in list_slices])
for i_slice in range(n_slices):
i_shim = [list_slices.index(i) for i in list_slices if i_slice in i][0]
if i_channel == 0:
# f0, Output is in Hz
if currents_static is not None:
f.write(f"corr_vec[0][{i_slice}]= "
f"{currents_static[i_shim, i_channel]:.6f}\n")
if currents_riro is not None:
f.write(f"corr_vec[1][{i_slice}]= "
f"{currents_riro[i_shim, i_channel]:.12f}\n")
f.write(f"corr_vec[2][{i_slice}]= {mean_p:.3f}\n")
else:
# For Gx, Gy, Gz: Divide by 1000 for mT/m
if currents_static is not None:
f.write(f"corr_vec[0][{i_slice}]= "
f"{currents_static[i_shim, i_channel] / 1000:.6f}\n")
if currents_riro is not None:
f.write(f"corr_vec[1][{i_slice}]= "
f"{currents_riro[i_shim, i_channel] / 1000:.12f}\n")
f.write(f"corr_vec[2][{i_slice}]= {mean_p:.3f}\n")
list_fname_output.append(os.path.abspath(fname_output))
return list_fname_output
def parse_orders(orders: str):
orders = orders.split(',')
try:
orders = [int(order) for order in orders]
orders.sort()
if any(order not in AVAILABLE_ORDERS for order in orders):