diff --git a/docs/source/api_reference/api.rst b/docs/source/api_reference/api.rst index 48c144f07..d051026df 100644 --- a/docs/source/api_reference/api.rst +++ b/docs/source/api_reference/api.rst @@ -56,9 +56,9 @@ Shim .. Error while importing sklearn and skimage .. automodule:: shimmingtoolbox.shim.realtime_shim :members: - -.. automodule:: shimmingtoolbox.optimizer.sequential - :members: +.. Error while importing sklearn (Linear Regression) + .. automodule:: shimmingtoolbox.shim.sequencer + :members: Optimizer --------- diff --git a/shimmingtoolbox/coils/coordinates.py b/shimmingtoolbox/coils/coordinates.py index 6cd2e5e10..d16fcb46f 100644 --- a/shimmingtoolbox/coils/coordinates.py +++ b/shimmingtoolbox/coils/coordinates.py @@ -158,7 +158,7 @@ def resample_from_to(nii_from_img, nii_to_vox_map, order=2, mode='nearest', cval from_img = nii_from_img.get_fdata() if from_img.ndim == 2: - nii_from_img_3d = nib.Nifti1Image(np.expand_dims(from_img, -1), nii_from_img.affine) + nii_from_img_3d = nib.Nifti1Image(np.expand_dims(from_img, -1), nii_from_img.affine, header=nii_from_img.header) nii_resampled = nib_resample_from_to(nii_from_img_3d, nii_to_vox_map, order=order, mode=mode, cval=cval, out_class=out_class) @@ -176,7 +176,7 @@ def resample_from_to(nii_from_img, nii_to_vox_map, order=2, mode='nearest', cval cpus = mp.cpu_count() if cpus == 1: for it in range(nt): - nii_from_img_3d = nib.Nifti1Image(from_img[..., it], nii_from_img.affine) + nii_from_img_3d = nib.Nifti1Image(from_img[..., it], nii_from_img.affine, header=nii_from_img.header) nii_resampled_3d = nib_resample_from_to(nii_from_img_3d, nii_to_vox_map, order=order, mode=mode, cval=cval, out_class=out_class) resampled_4d[..., it] = nii_resampled_3d.get_fdata() @@ -184,7 +184,7 @@ def resample_from_to(nii_from_img, nii_to_vox_map, order=2, mode='nearest', cval # Create inputs for multiprocessing inputs = [] for it in range(nt): - nii_from_img_3d = nib.Nifti1Image(from_img[..., it], nii_from_img.affine) + nii_from_img_3d = nib.Nifti1Image(from_img[..., it], nii_from_img.affine, header=nii_from_img.header) inputs.append((nii_from_img_3d, nii_to_vox_map, order, mode, cval, out_class)) with mp.Pool(cpus - 1) as pool: @@ -194,7 +194,7 @@ def resample_from_to(nii_from_img, nii_to_vox_map, order=2, mode='nearest', cval for it in range(from_img.shape[3]): resampled_4d[..., it] = output[it] - nii_resampled = nib.Nifti1Image(resampled_4d, nii_to_vox_map.affine) + nii_resampled = nib.Nifti1Image(resampled_4d, nii_to_vox_map.affine, header=nii_to_vox_map.header) else: raise NotImplementedError("Dimensions of input can only be 2D, 3D or 4D") diff --git a/shimmingtoolbox/masking/mask_utils.py b/shimmingtoolbox/masking/mask_utils.py new file mode 100644 index 000000000..f9c6c22b5 --- /dev/null +++ b/shimmingtoolbox/masking/mask_utils.py @@ -0,0 +1,191 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import nibabel as nib +import numpy as np +from scipy.ndimage import binary_dilation +from scipy.ndimage import binary_opening +from scipy.ndimage import generate_binary_structure +from scipy.ndimage import iterate_structure + +from shimmingtoolbox.coils.coordinates import resample_from_to + + +def resample_mask(nii_mask_from, nii_target, from_slices): + """ + Select the appropriate slices from ``nii_mask_from`` using ``from_slices`` and resample onto ``nii_target`` + + Args: + nii_mask_from (nib.Nifti1Image): Mask to resample from. False or 0 signifies not included. + nii_target (nib.Nifti1Image): Target image to resample onto. + from_slices (tuple): Tuple containing the slices to select from nii_mask_from. + + Returns: + nib.Nifti1Image: Mask resampled with nii_target.shape and nii_target.affine. + """ + + mask_from = nii_mask_from.get_fdata() + + # Initialize a sliced mask and select the slices from from_slices + sliced_mask = np.full_like(mask_from, fill_value=False) + sliced_mask[:, :, from_slices] = mask_from[:, :, from_slices] + + # Create nibabel object + nii_mask = nib.Nifti1Image(sliced_mask.astype(int), nii_mask_from.affine, header=nii_mask_from.header) + + # Resample the mask onto nii_target + nii_mask_target = resample_from_to(nii_mask, nii_target, order=0, mode='grid-constant', cval=0) + + # dilate the mask to add more pixels in particular directions + mask_dilated = dilate_binary_mask(nii_mask_target.get_fdata(), 'line', 3) + nii_mask_dilated = nib.Nifti1Image(mask_dilated, nii_mask_target.affine, header=nii_mask_target.header) + + ####### + # Debug TODO: REMOVE + import os + nib.save(nii_mask, os.path.join(os.curdir, f"fig_mask_{from_slices[0]}.nii.gz")) + nib.save(nii_mask_from, os.path.join(os.curdir, "fig_mask_roi.nii.gz")) + nib.save(nii_mask_target, os.path.join(os.curdir, f"fig_mask_res{from_slices[0]}.nii.gz")) + nib.save(nii_mask_dilated, os.path.join(os.curdir, f"fig_mask_dilated{from_slices[0]}.nii.gz")) + ####### + + return nii_mask_dilated + + +def dilate_binary_mask(mask, shape='cross', size=3): + """ + Dilates a binary mask according to different shapes and kernel size + + Args: + mask (numpy.ndarray): 3d array containing the binary mask. + shape (str): 3d kernel to perform the dilation. Allowed shapes are: 'sphere', 'cross', 'line', 'cube'. + 'line' uses 3 line kernels to extend in each directions by "(size - 1) / 2" only if that direction + is smaller than (size - 1) / 2 + size (int): length of a side of the 3d kernel. + + Returns: + numpy.ndarray: Dilated mask. + + Notes: + Kernels for + 'cross': + array([[[0., 0., 0.], + [0., 1., 0.], + [0., 0., 0.]], + [[0., 1., 0.], + [1., 1., 1.], + [0., 1., 0.]], + [[0., 0., 0.], + [0., 1., 0.], + [0., 0., 0.]]]) + + 'sphere' size 5: + [[[False False False False False] + [False False False False False] + [False False True False False] + [False False False False False] + [False False False False False]] + [[False False False False False] + [False False True False False] + [False True True True False] + [False False True False False] + [False False False False False]] + [[False False True False False] + [False True True True False] + [ True True True True True] + [False True True True False] + [False False True False False]] + [[False False False False False] + [False False True False False] + [False True True True False] + [False False True False False] + [False False False False False]] + [[False False False False False] + [False False False False False] + [False False True False False] + [False False False False False] + [False False False False False]]] + + 'cube': + array([[[ True, True, True], + [ True, True, True], + [ True, True, True]], + [[ True, True, True], + [ True, True, True], + [ True, True, True]], + [[ True, True, True], + [ True, True, True], + [ True, True, True]]]) + + 'line': + array([[[0., 0., 0.], + [0., 1., 0.], + [0., 0., 0.]], + [[0., 0., 0.], + [0., 1., 0.], + [0., 0., 0.]], + [[0., 0., 0.], + [0., 1., 0.], + [0., 0., 0.]]]) + + """ + + if size % 2 == 0 or size < 3: + raise ValueError("Size must be odd and greater or equal to 3") + # Find the middle pixel, will always work since we check size is odd + mid_pixel = int((size - 1) / 2) + + if shape == 'sphere': + # Define kernel to perform the dilation + struct_sphere_size1 = generate_binary_structure(3, 1) + struct = iterate_structure(struct_sphere_size1, mid_pixel) + + # Dilate + mask_dilated = binary_dilation(mask, structure=struct) + + elif shape == 'cross': + struct = np.zeros([size, size, size]) + struct[:, mid_pixel, mid_pixel] = 1 + struct[mid_pixel, :, mid_pixel] = 1 + struct[mid_pixel, mid_pixel, :] = 1 + + # Dilate + mask_dilated = binary_dilation(mask, structure=struct) + + elif shape == 'cube': + # Define kernel to perform the dilation + struct_cube_size1 = generate_binary_structure(3, 3) + struct = iterate_structure(struct_cube_size1, mid_pixel) + + # Dilate + mask_dilated = binary_dilation(mask, structure=struct) + + elif shape == 'line': + + struct_dim1 = np.zeros([size, size, size]) + struct_dim1[:, mid_pixel, mid_pixel] = 1 + # Finds where the structure fits + open1 = binary_opening(mask, structure=struct_dim1) + # Select Everything that does not fit within the structure and erode along a dim + dim1 = binary_dilation(np.logical_and(np.logical_not(open1), mask), structure=struct_dim1) + + struct_dim2 = np.zeros([size, size, size]) + struct_dim2[mid_pixel, :, mid_pixel] = 1 + # Finds where the structure fits + open2 = binary_opening(mask, structure=struct_dim2) + # Select Everything that does not fit within the structure and erode along a dim + dim2 = binary_dilation(np.logical_and(np.logical_not(open2), mask), structure=struct_dim2) + + struct_dim3 = np.zeros([size, size, size]) + struct_dim3[mid_pixel, mid_pixel, :] = 1 + # Finds where the structure fits + open3 = binary_opening(mask, structure=struct_dim3) + # Select Everything that does not fit within the structure and erode along a dim + dim3 = binary_dilation(np.logical_and(np.logical_not(open3), mask), structure=struct_dim3) + + mask_dilated = np.logical_or(np.logical_or(np.logical_or(dim1, dim2), dim3), mask) + + else: + raise ValueError("Use of non supported algorithm for dilating the mask") + + return mask_dilated.astype(int) diff --git a/shimmingtoolbox/optimizer/basic_optimizer.py b/shimmingtoolbox/optimizer/basic_optimizer.py index bd9de0023..b1b051931 100644 --- a/shimmingtoolbox/optimizer/basic_optimizer.py +++ b/shimmingtoolbox/optimizer/basic_optimizer.py @@ -17,7 +17,7 @@ class Optimizer(object): """ Optimizer object that stores coil profiles and optimizes an unshimmed volume given a mask. Use optimize(args) to optimize a given mask. - For basic optimizer, uses unbounded pseudo-inverse. + For basic optimizer, uses *unbounded* pseudo-inverse. Attributes: coils (ListCoil): List of Coil objects containing the coil profiles and related constraints @@ -43,20 +43,44 @@ def __init__(self, coils: ListCoil, unshimmed, affine): logging.basicConfig(filename='test_optimizer.log', filemode='w', level=logging.DEBUG) self.coils = coils + self.unshimmed = np.array([]) + self.unshimmed_affine = [] + self.merged_coils = [] + self.merged_bounds = [] + self.set_unshimmed(unshimmed, affine) + def set_unshimmed(self, unshimmed, affine): + """ + Set the unshimmed array to a new array. Resamples coil profiles accordingly. + + Args: + unshimmed (numpy.ndarray): 3d array of unshimmed volume + affine: (numpy.ndarray): 4x4 array containing the qform affine transformation for the unshimmed array + """ # Check dimensions of unshimmed map if unshimmed.ndim != 3: raise ValueError(f"Unshimmed profile has {unshimmed.ndim} dimensions, expected 3 (dim1, dim2, dim3)") - self.unshimmed = unshimmed # Check dimensions of affine if affine.shape != (4, 4): raise ValueError("Shape of affine matrix should be 4x4") + + # Define coil profiles if unshimmed or affine is different than previously + if (self.unshimmed.shape != unshimmed.shape) or not np.all(self.unshimmed_affine == affine): + self.merged_coils, self.merged_bounds = self.merge_coils(unshimmed, affine) + + self.unshimmed = unshimmed self.unshimmed_affine = affine - # Define coil profiles - merged_coils, merged_bounds = self.merge_coils(unshimmed, affine) - self.merged_coils = merged_coils + def set_merged_bounds(self, merged_bounds): + """ + Changes the default bounds set in the coil profile + + Args: + merged_bounds: Concatenated coil profile bounds + """ + if len(self.merged_bounds) != len(merged_bounds): + raise ValueError(f"Size of merged bounds: must match the number of total channel: {len(self.merged_bounds)}") self.merged_bounds = merged_bounds def optimize(self, mask): @@ -99,7 +123,6 @@ def merge_coils(self, unshimmed, affine): """ coil_profiles_list = [] - bounds = [] # Define the nibabel unshimmed array nii_unshimmed = nib.Nifti1Image(unshimmed, affine) @@ -109,15 +132,48 @@ def merge_coils(self, unshimmed, affine): # Resample a coil on the unshimmed image resampled_coil = resample_from_to(nii_coil, nii_unshimmed).get_fdata() + coil_profiles_list.append(resampled_coil) + coil_profiles = np.concatenate(coil_profiles_list, axis=3) + + bounds = self.merge_bounds() + + return coil_profiles, bounds + + def merge_bounds(self): + """ + Merge the coil profile bounds into a single array. + + Returns: + list: list of bounds corresponding to each merged coils + """ + + bounds = [] + for coil in self.coils: # Concat coils and bounds - coil_profiles_list.append(resampled_coil) for a_bound in coil.coef_channel_minmax: bounds.append(a_bound) - coil_profiles = np.concatenate(coil_profiles_list, axis=3) + return bounds - return coil_profiles, bounds + def initial_guess_mean_bounds(self): + """ + Calculates the initial guess from the bounds, sets it to the mean of the bounds + + Returns: + np.ndarray: 1d array (n_channels) of coefficient representing the initial guess + + """ + current_0 = [] + for bounds in self.merged_bounds: + avg = np.mean(bounds) + + if np.isnan(avg): + current_0.append(0) + else: + current_0.append(avg) + + return np.array(current_0) def _check_sizing(self, mask): """ diff --git a/shimmingtoolbox/optimizer/lsq_optimizer.py b/shimmingtoolbox/optimizer/lsq_optimizer.py index 63906e081..fc1ecb343 100644 --- a/shimmingtoolbox/optimizer/lsq_optimizer.py +++ b/shimmingtoolbox/optimizer/lsq_optimizer.py @@ -8,6 +8,10 @@ class LsqOptimizer(Optimizer): + """ Optimizer object that stores coil profiles and optimizes an unshimmed volume given a mask. + Use optimize(args) to optimize a given mask. The algorithm uses a least squares solver to find the best shim. + It supports bounds for each channel as well as a bound for the absolute sum of the channels. + """ def _residuals(self, coef, unshimmed_vec, coil_mat): """ @@ -68,7 +72,7 @@ def _apply_sum_constraint(inputs, indexes, coef_sum_max): start_index = end_index # Set up output currents - currents_0 = np.zeros(n_channels) + currents_0 = self.initial_guess_mean_bounds() # Optimize currents_sp = opt.minimize(self._residuals, currents_0, diff --git a/shimmingtoolbox/optimizer/sequential.py b/shimmingtoolbox/optimizer/sequential.py deleted file mode 100644 index 055500d35..000000000 --- a/shimmingtoolbox/optimizer/sequential.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -import numpy as np -from typing import List - -from shimmingtoolbox.optimizer.lsq_optimizer import LsqOptimizer -from shimmingtoolbox.optimizer.basic_optimizer import Optimizer -from shimmingtoolbox.coils.coil import Coil - -ListCoil = List[Coil] - -supported_optimizers = { - 'least_squares': LsqOptimizer, - 'pseudo_inverse': Optimizer -} - - -def sequential_zslice(unshimmed, affine, coils: ListCoil, mask, z_slices, method='least_squares'): - """ - Performs shimming slice by slice using one of the supported optimizers - - Args: - unshimmed (numpy.ndarray): 3D B0 map - affine (np.ndarray): 4x4 array containing the affine transformation for the unshimmed array - coils (ListCoil): List of Coils containing the coil profiles - mask (numpy.ndarray): 3D mask used for the optimizer (only consider voxels with non-zero values). - z_slices (numpy.ndarray): 1D array containing z slices to shim - method (str): Supported optimizer: 'least_squares', 'pseudo_inverse' - Returns: - numpy.ndarray: Coefficients to shim (channels x z_slices.size) - """ - - # Select and initialize the optimizer - optimizer = select_optimizer(method, unshimmed, affine, coils) - - # Optimize slice by slice - currents = optimize_slicewise(optimizer, mask, z_slices) - - return currents - - -def select_optimizer(method, unshimmed, affine, coils: ListCoil): - """ - Select and initialize the optimizer - - Args: - method (str): Supported optimizer: 'least_squares', 'pseudo_inverse' - unshimmed (numpy.ndarray): 3D B0 map - affine (np.ndarray): 4x4 array containing the affine transformation for the unshimmed array - coils (ListCoil): List of Coils containing the coil profiles - - Returns: - Optimizer: Initialized Optimizer object - """ - - # global supported_optimizers - if method in supported_optimizers: - optimizer = supported_optimizers[method](coils, unshimmed, affine) - else: - raise KeyError(f"Method: {method} is not part of the supported optimizers") - - return optimizer - - -def optimize_slicewise(optimizer: Optimizer, mask, z_slices): - """ - Shim slicewise in the specified ROI - - Args: - optimizer (Optimizer): Initialized Optimizer object - mask (numpy.ndarray): 3D mask used for the optimizer (only consider voxels with non-zero values). - z_slices (numpy.ndarray): 1D array containing z slices to shim - - Returns: - numpy.ndarray: Coefficients to shim (channels x z_slices.size) - """ - # Count number of channels - n_channels = optimizer.merged_coils.shape[3] - - z_slices.reshape(z_slices.size) - currents = np.zeros((z_slices.size, n_channels)) - for i in range(z_slices.size): - sliced_mask = np.full_like(mask, fill_value=False) - z = z_slices[i] - sliced_mask[:, :, z:z+1] = mask[:, :, z:z+1] - currents[i, :] = optimizer.optimize(sliced_mask) - - return currents diff --git a/shimmingtoolbox/pmu.py b/shimmingtoolbox/pmu.py index 514e36282..cee823588 100644 --- a/shimmingtoolbox/pmu.py +++ b/shimmingtoolbox/pmu.py @@ -37,6 +37,8 @@ def __init__(self, fname_pmu): self.stop_time_mdh = attributes['stop_time_mdh'] self.start_time_mpcu = attributes['start_time_mpcu'] self.stop_time_mpcu = attributes['stop_time_mpcu'] + self.max = attributes['max'] + self.min = attributes['min'] def read_resp(self, fname_pmu): """ @@ -114,7 +116,9 @@ def read_resp(self, fname_pmu): 'start_time_mdh': start_time_mdh, 'stop_time_mdh': stop_time_mdh, 'start_time_mpcu': start_time_mpcu, - 'stop_time_mpcu': stop_time_mpcu + 'stop_time_mpcu': stop_time_mpcu, + 'max': 4095, + 'min': 0 } return attributes @@ -131,10 +135,10 @@ def interp_resp_trace(self, acquisition_times): Returns: numpy.ndarray: 1D array with interpolated times """ - if np.any(self.start_time_mdh > acquisition_times) or np.all(self.stop_time_mdh < acquisition_times): + if np.any(self.start_time_mdh > acquisition_times) or np.any(self.stop_time_mdh < acquisition_times): raise RuntimeError("acquisition_times don't fit within time limits for resp trace") - raster = float(self.stop_time_mdh - self.start_time_mdh) / len(self.data-1) + raster = float(self.stop_time_mdh - self.start_time_mdh) / (len(self.data)-1) times = (self.start_time_mdh + raster * np.arange(len(self.data))) # ms interp_data = np.interp(acquisition_times, times, self.data) diff --git a/shimmingtoolbox/shim/sequencer.py b/shimmingtoolbox/shim/sequencer.py new file mode 100644 index 000000000..170e1cd0b --- /dev/null +++ b/shimmingtoolbox/shim/sequencer.py @@ -0,0 +1,311 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import numpy as np +from typing import List +from sklearn.linear_model import LinearRegression +import nibabel as nib +import logging + +from shimmingtoolbox.optimizer.lsq_optimizer import LsqOptimizer +from shimmingtoolbox.optimizer.basic_optimizer import Optimizer +from shimmingtoolbox.coils.coil import Coil +from shimmingtoolbox.load_nifti import get_acquisition_times +from shimmingtoolbox.pmu import PmuResp +from shimmingtoolbox.masking.mask_utils import resample_mask + +ListCoil = List[Coil] + +logger = logging.getLogger(__name__) + +supported_optimizers = { + 'least_squares': LsqOptimizer, + 'pseudo_inverse': Optimizer +} + + +def shim_sequencer(nii_fieldmap, nii_anat, nii_mask_anat, slices, coils: ListCoil, method='least_squares'): + """ + Performs shimming according to slices using one of the supported optimizers and coil profiles. + + Args: + nii_fieldmap (nibabel.Nifti1Image): Nibabel object containing fieldmap data in 3d. + nii_anat (nibabel.Nifti1Image): Nibabel object containing anatomical data in 3d. + nii_mask_anat (nibabel.Nifti1Image): 3D anat mask used for the optimizer to shim in the region of interest. + (only consider voxels with non-zero values) + slices (list): 1D array containing tuples of dim3 slices to shim according to the anat, where the shape of anat + is: (dim1, dim2, dim3). Refer to shimmingtoolbox.shim.sequencer:define_slices(). + coils (ListCoil): List of Coils containing the coil profiles. The coil profiles and the fieldmaps must have + matching units (if fmap is in Hz, the coil profiles must be in hz/unit_shim). + Refer to shimmingtoolbox.coils.coil:Coil(). + method (str): Supported optimizer: 'least_squares', 'pseudo_inverse'. Note: refer to their specific + implementation to know limits of the methods in: shimmingtoolbox.optimizer + + Returns: + numpy.ndarray: Coefficients to shim (len(slices) x channels) + """ + + # Make sure fieldmap has the appropriate dimensions + fieldmap = nii_fieldmap.get_fdata() + affine_fieldmap = nii_fieldmap.affine + if fieldmap.ndim != 3: + raise ValueError("Fieldmap must be 3d (dim1, dim2, dim3)") + + # Make sure anat has the appropriate dimensions + anat = nii_anat.get_fdata() + if anat.ndim != 3: + raise ValueError("Anatomical image must be in 3d") + + # Make sure the mask has the appropriate dimensions + mask = nii_mask_anat.get_fdata() + if mask.ndim != 3: + raise ValueError("Mask image must be in 3d") + + # Make sure shape and affine of mask are the same as the anat + if not np.all(mask.shape == anat.shape): + raise ValueError(f"Shape of mask: {mask.shape} must be the same as the shape of anat: {anat.shape}") + if not np.all(nii_mask_anat.affine == nii_anat.affine): + raise ValueError(f"Affine of mask: {nii_mask_anat.affine} must be the same as the affine of anat: " + f"{nii_anat.affine}") + + # Select and initialize the optimizer + optimizer = select_optimizer(method, fieldmap, affine_fieldmap, coils) + + # Optimize slice by slice + currents = optimize(optimizer, nii_mask_anat, slices) + + return currents + + +def shim_realtime_pmu_sequencer(nii_fieldmap, json_fmap, nii_anat, nii_static_mask, nii_riro_mask, slices, + pmu: PmuResp, coils: ListCoil, opt_method='least_squares'): + """ + Performs realtime shimming using one of the supported optimizers and an external respiratory trace. + + Args: + nii_fieldmap (nibabel.Nifti1Image): Nibabel object containing fieldmap data in 4d where the 4th dimension is the + timeseries. + json_fmap (dict): Dict of the json sidecar corresponding to the fieldmap data (Used to find the acquisition + timestamps). + nii_anat (nibabel.Nifti1Image): Nibabel object containing anatomical data in 3d. + nii_static_mask (nibabel.Nifti1Image): 3D anat mask used for the optimizer to shim the region for the static + component. + nii_riro_mask (nibabel.Nifti1Image): 3D anat mask used for the optimizer to shim the region for the riro + component. + slices (list): 1D array containing tuples of dim3 slices to shim according to the anat where the shape of anat: + (dim1, dim2, dim3). Refer to shimmingtoolbox.shim.sequencer:define_slices(). + pmu (PmuResp): Filename of the file of the respiratory trace. + coils (ListCoil): List of Coils containing the coil profiles. The coil profiles and the fieldmaps must have + matching units (if fmap is in Hz, the coil profiles must be in hz/unit_shim). + opt_method (str): Supported optimizer: 'least_squares', 'pseudo_inverse'. + + Returns: + (tuple): tuple containing: + + * numpy.ndarray: Static coefficients to shim (len(slices) x channels) e.g. [Hz] + * numpy.ndarray: Static coefficients to shim (len(slices) x channels) e.g. [Hz/unit_pressure] + * float: Mean pressure of the respiratory trace. + * float: Root mean squared of the pressure. This is provided to compare results between scans, multiply the + riro coefficients by rms of the pressure to do so. + """ + # Note: We technically dont need the anat if we use the nii_mask. However, this is a nice safety check to make sur + # the mask is indeed in the dimension of the anat and not the fieldmap + + # Make sure fieldmap has the appropriate dimensions + fieldmap = nii_fieldmap.get_fdata() + affine_fieldmap = nii_fieldmap.affine + if fieldmap.ndim != 4: + raise RuntimeError("Fieldmap must be 4d (dim1, dim2, dim3, t)") + + # Make sure anat has the appropriate dimensions + anat = nii_anat.get_fdata() + if anat.ndim != 3: + raise RuntimeError("Anatomical image must be in 3d") + + # Make sure masks have the appropriate dimensions + static_mask = nii_static_mask.get_fdata() + if static_mask.ndim != 3: + raise RuntimeError("static_mask image must be in 3d") + riro_mask = nii_riro_mask.get_fdata() + if riro_mask.ndim != 3: + raise RuntimeError("riro_mask image must be in 3d") + + # Make sure shape and affine of masks are the same as the anat + if not (np.all(riro_mask.shape == anat.shape) and np.all(static_mask.shape == anat.shape)): + raise ValueError(f"Shape of riro mask: {riro_mask.shape} and static mask: {static_mask.shape} " + f"must be the same as the shape of anat: {anat.shape}") + if not(np.all(nii_riro_mask.affine == nii_anat.affine) and np.all(nii_static_mask.affine == nii_anat.affine)): + raise ValueError(f"Affine of riro mask: {nii_riro_mask.affine} and static mask: {nii_static_mask.affine} " + f"must be the same as the affine of anat: {nii_anat.affine}") + + # Fetch PMU timing + acq_timestamps = get_acquisition_times(nii_fieldmap, json_fmap) + # TODO: deal with saturation + # fit PMU and fieldmap values + acq_pressures = pmu.interp_resp_trace(acq_timestamps) + + # regularization --> static, riro + # field(i_vox) = riro(i_vox) * (acq_pressures - mean_p) + static(i_vox) + mean_p = np.mean(acq_pressures) + pressure_rms = np.sqrt(np.mean((acq_pressures - mean_p) ** 2)) + reg = LinearRegression().fit(acq_pressures.reshape(-1, 1) - mean_p, fieldmap.reshape(-1, fieldmap.shape[-1]).T) + + # static/riro contains a 3d matrix of static/riro coefficients in the fieldmap space + static = reg.intercept_.reshape(fieldmap.shape[:-1]) + riro = reg.coef_.reshape(fieldmap.shape[:-1]) # [unit_shim/unit_pressure], ex: [Hz/unit_pressure] + + # Static shim + optimizer = select_optimizer(opt_method, static, affine_fieldmap, coils) + currents_static = optimize(optimizer, nii_static_mask, slices) + + # Use the currents to define a list of new bounds for the riro optimization + bounds = new_bounds_from_currents(currents_static, optimizer.merged_bounds) + + # Riro shim + # We multiply by the max offset of the siemens pmu [max - min = 4095] so that the bounds take effect on the maximum + # value that the pressure probe can acquire. The equation "riro(i_vox) * (acq_pressures - mean_p)" becomes + # "riro(i_vox) * max_offset" which is the maximum shim we will have. We solve for that to make sure the coils can + # support it. The units of riro * max_offset are: [unit_shim], ex: [Hz] + max_offset = max((pmu.max - pmu.min) - mean_p, mean_p) + + # Set the riro map to shim + optimizer.set_unshimmed(riro * max_offset, affine_fieldmap) + currents_max_riro = optimize(optimizer, nii_riro_mask, slices, shimwise_bounds=bounds) + # Once the currents are solved, we divide by max_offset to return to units of + # [unit_shim/unit_pressure], ex: [Hz/unit_pressure] + currents_riro = currents_max_riro / max_offset + + # Multiplying by the RMS of the pressure allows to make abstraction of the tightness of the bellow + # between scans. This allows to compare results between scans. + # currents_riro_rms = currents_riro * pressure_rms + # [unit_shim/unit_pressure] * rms_pressure, ex: [Hz/unit_pressure] * rms_pressure + + return currents_static, currents_riro, mean_p, pressure_rms + + +def new_bounds_from_currents(currents, old_bounds): + """ + Uses the currents to determine the appropriate bound for a next optimization. It assumes that + "old_current + next_current < old_bound". + + Args: + currents: 2D array (n_shims x n_channels). Direct output from ``optimize``. + old_bounds: 1d list (n_channels) of the merged bounds of the previous optimization. + + Returns: + list: 2d list (n_shims x n_channels) of bounds (min, max) corresponding to each shim and channel. + """ + + new_bounds = [] + for i_shim in range(currents.shape[0]): + shim_bound = [] + for i_channel in range(len(old_bounds)): + a_bound = old_bounds[i_channel] - currents[i_shim, i_channel] + shim_bound.append(tuple(a_bound)) + new_bounds.append(shim_bound) + + return new_bounds + + +def select_optimizer(method, unshimmed, affine, coils: ListCoil): + """ + Select and initialize the optimizer + + Args: + method (str): Supported optimizer: 'least_squares', 'pseudo_inverse' + unshimmed (numpy.ndarray): 3D B0 map + affine (np.ndarray): 4x4 array containing the affine transformation for the unshimmed array + coils (ListCoil): List of Coils containing the coil profiles + + Returns: + Optimizer: Initialized Optimizer object + """ + + # global supported_optimizers + if method in supported_optimizers: + optimizer = supported_optimizers[method](coils, unshimmed, affine) + else: + raise KeyError(f"Method: {method} is not part of the supported optimizers") + + return optimizer + + +def optimize(optimizer: Optimizer, nii_mask_anat, slices_anat, shimwise_bounds=None): + + # Count number of channels + n_channels = optimizer.merged_coils.shape[3] + + # Count shims to perform + n_shims = len(slices_anat) + + # Initialize + currents = np.zeros((n_shims, n_channels)) + + # For each shim + for i in range(n_shims): + # Create nibabel object of the unshimmed map + nii_unshimmed = nib.Nifti1Image(optimizer.unshimmed, optimizer.unshimmed_affine) + + # Create mask in the fieldmap coordinate system from the anat roi mask and slice anat mask + # TODO: deal with 1 slice fieldmap, + sliced_mask_resampled = resample_mask(nii_mask_anat, nii_unshimmed, slices_anat[i]).get_fdata() + + # If new bounds are included, change them for each shim + if shimwise_bounds is not None: + optimizer.set_merged_bounds(shimwise_bounds[i]) + + # Optimize using the mask + currents[i, :] = optimizer.optimize(sliced_mask_resampled) + + return currents + + +def define_slices(n_slices: int, factor: int, method='interleaved'): + """ + Define the slices to shim according to the output convention. + + Args: + n_slices (int): Number of total slices. + factor (int): Number of slices per shim. + method (str): Defines how the slices should be sorted, supported methods include: 'interleaved', 'sequential'. + See Examples for more details. + + Returns: + list: 1D list containing tuples of z slices to shim. + + Examples: + + :: + + slices = define_slices(10, 2, 'interleaved') + print(slices) # [(0, 5), (1, 6), (2, 7), (3, 8), (4, 9)] + + slices = define_slices(20, 5, 'sequential') + print(slices) # [(0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 13, 14), (15, 16, 17, 18, 19)] + + """ + if n_slices <= 0: + return [tuple()] + + slices = [] + n_shims = n_slices // factor + leftover = n_slices % factor + + if method == 'interleaved': + for i_shim in range(n_shims): + slices.append(tuple(range(i_shim, n_shims * factor, n_shims))) + + elif method == 'sequential': + for i_shim in range(n_shims): + slices.append(tuple(range(i_shim * factor, (i_shim + 1) * factor, 1))) + + else: + raise NotImplementedError("Not a supported method to define slices") + + if leftover != 0: + slices.append(tuple(range(n_shims * factor, n_slices))) + logger.warning(f"When defining the slices to shim, there are leftover slices since the factor used and number " + f"of slices is not perfectly dividable. Make sure the last tuple of slices is " + f"appropriate: {slices}") + + return slices diff --git a/test/masking/test_mask_utils.py b/test/masking/test_mask_utils.py new file mode 100644 index 000000000..c04aea724 --- /dev/null +++ b/test/masking/test_mask_utils.py @@ -0,0 +1,108 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- +import os + +import nibabel as nib +import numpy as np +import pytest + +from shimmingtoolbox.masking.mask_utils import dilate_binary_mask, resample_mask +from shimmingtoolbox.masking.shapes import shapes +from shimmingtoolbox import __dir_testing__ + +dummy_image = np.zeros([10, 10, 10]) +dummy_image[2, 2, 3] = 1 +dummy_image[7:10, 2, 5] = 1 +dummy_image[2, 6:8, 6:8] = 1 + + +@pytest.mark.parametrize( + "input_mask,", [( + dummy_image, + )] +) +class TestDilaceBinaryMask(object): + def test_dilate_binary_mask_default(self, input_mask): + """Default is the cross""" + dilated = dilate_binary_mask(input_mask[0]) + + # Expected slice 7 + expected_slice = np.zeros([10, 10]) + expected_slice[2, 5:9] = 1 + expected_slice[1:4, 6:8] = 1 + + assert np.all(expected_slice == dilated[..., 7]) + + def test_dilate_binary_mask_sphere(self, input_mask): + + dilated = dilate_binary_mask(input_mask[0], shape='sphere', size=5) + + # Expected slice 8 + expected_slice = np.zeros([10, 10]) + expected_slice[2, 5:9] = 1 + expected_slice[1:4, 6:8] = 1 + + assert np.all(expected_slice == dilated[..., 8]) + + def test_dilate_binary_mask_cube(self, input_mask): + + dilated = dilate_binary_mask(input_mask[0], shape='cube') + + # Expected slice 7 + expected_slice = np.zeros([10, 10]) + expected_slice[1:4, 5:9] = 1 + + assert np.all(expected_slice == dilated[..., 7]) + + def test_dilate_binary_mask_line(self, input_mask): + + dilated = dilate_binary_mask(input_mask[0], shape='line') + + # Expected slice in x,z plane 2 + expected_slice = np.zeros([10, 10]) + expected_slice[7:10, 4:7] = 1 + expected_slice[2, 2:5] = 1 + expected_slice[1:4, 3] = 1 + + assert np.all(expected_slice == dilated[:, 2, :]) + + def test_dilate_binary_mask_wrong_size(self, input_mask): + + with pytest.raises(ValueError, match="Size must be odd and greater or equal to 3"): + dilate_binary_mask(input_mask[0], size=4) + + def test_dilate_binary_mask_wrong_shape(self, input_mask): + + with pytest.raises(ValueError, match="Use of non supported algorithm for dilating the mask"): + dilate_binary_mask(input_mask[0], 'abc') + + +def test_resample_mask(): + """Test for function that resamples a mask""" + # Fieldmap + fname_fieldmap = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'fmap', + 'sub-example_fieldmap.nii.gz') + nii_fieldmap = nib.load(fname_fieldmap) + nii_target = nib.Nifti1Image(nii_fieldmap.get_fdata()[..., 0], nii_fieldmap.affine, header=nii_fieldmap.header) + + # anat image + fname_anat = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'anat', + 'sub-example_unshimmed_e1.nii.gz') + nii_anat = nib.load(fname_anat) + + # Set up mask + # static + nx, ny, nz = nii_anat.shape + static_mask = shapes(nii_anat.get_fdata(), 'cube', + center_dim1=int(nx / 2), + center_dim2=int(ny / 2), + len_dim1=5, len_dim2=5, len_dim3=nz) + + nii_mask_static = nib.Nifti1Image(static_mask.astype(int), nii_anat.affine, header=nii_anat.header) + + nii_mask_res = resample_mask(nii_mask_static, nii_target, (0,)) + + expected = np.full_like(nii_target.get_fdata(), fill_value=False) + expected[24:28, 26:29, 0] = 1 + + assert np.all(nii_mask_res.get_fdata() == expected) diff --git a/test/test_masking.py b/test/masking/test_shape_threshold.py similarity index 100% rename from test/test_masking.py rename to test/masking/test_shape_threshold.py diff --git a/test/shim/test_realtime_shim.py b/test/shim/test_realtime_shim.py index d4586e61e..0c9f59c63 100644 --- a/test/shim/test_realtime_shim.py +++ b/test/shim/test_realtime_shim.py @@ -71,9 +71,9 @@ def test_default(self): self.json) assert np.isclose(static_zcorrection[0], 0.12928646689120157) - assert np.isclose(riro_zcorrection[0], -0.007959437710556651) - assert np.isclose(mean_p, 1326.3179660207873) - assert np.isclose(pressure_rms, 1493.9468284155396) + assert np.isclose(riro_zcorrection[0], -0.008013590565377253) + assert np.isclose(mean_p, 1326.7410085020922) + assert np.isclose(pressure_rms, 1494.6380477845253) def test_mask(self): """Test realtime_shim mask parameter""" @@ -87,9 +87,9 @@ def test_mask(self): nii_mask_anat_riro=self.nii_mask_riro) assert np.isclose(static_zcorrection[0], 0.2766538103967352) - assert np.isclose(riro_zcorrection[0], -0.051144561917725075) - assert np.isclose(mean_p, 1326.318) - assert np.isclose(pressure_rms, 1493.9468284155396) + assert np.isclose(riro_zcorrection[0], -0.05118665738437744) + assert np.isclose(mean_p, 1326.7410085020922) + assert np.isclose(pressure_rms, 1494.6380477845253) def test_output_figure(self): """Test realtime_shim output figures parameter""" diff --git a/test/shim/test_sequencer.py b/test/shim/test_sequencer.py new file mode 100644 index 000000000..aea0a8f4a --- /dev/null +++ b/test/shim/test_sequencer.py @@ -0,0 +1,708 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +from shimmingtoolbox import __dir_testing__ +from shimmingtoolbox.coils.siemens_basis import siemens_basis +from shimmingtoolbox.coils.coil import Coil +from shimmingtoolbox.coils.coordinates import generate_meshgrid +from shimmingtoolbox.load_nifti import get_acquisition_times +from shimmingtoolbox.masking.shapes import shapes +from shimmingtoolbox.optimizer.basic_optimizer import Optimizer +from shimmingtoolbox.pmu import PmuResp +from shimmingtoolbox.shim.sequencer import shim_sequencer +from shimmingtoolbox.shim.sequencer import shim_realtime_pmu_sequencer +from shimmingtoolbox.shim.sequencer import define_slices +from shimmingtoolbox.shim.sequencer import resample_mask +from shimmingtoolbox.simulate.numerical_model import NumericalModel + +import numpy as np +import pytest +import os +import nibabel as nib +import json +from matplotlib.figure import Figure + +DEBUG = False + + +def create_fieldmap(n_slices=3): + # Set up 2-dimensional unshimmed fieldmaps + num_vox = 100 + model_obj = NumericalModel('shepp-logan', num_vox=num_vox) + model_obj.generate_deltaB0('linear', [0.025, 2]) + tr = 0.025 # in s + te = [0.004, 0.008] # in s + model_obj.simulate_measurement(tr, te) + phase_meas1 = model_obj.get_phase() + phase_e1 = phase_meas1[:, :, 0, 0] + phase_e2 = phase_meas1[:, :, 0, 1] + b0_map = ((phase_e2 - phase_e1) / (te[1] - te[0])) / (2 * np.pi) + + # Construct a 3-dimensional synthetic field map by stacking different z-slices along the 3rd dimension. Each + # slice is subjected to a manipulation of model_obj across slices (e.g. rotation, squared) in order to test + # various shim configurations. + unshimmed = np.zeros([num_vox, num_vox, n_slices]) + for i_n in range(n_slices // 3): + unshimmed[:, :, 3 * i_n] = b0_map + unshimmed[:, :, (3 * i_n) + 1] = (np.rot90(unshimmed[:, :, 0]) + unshimmed[:, :, 0]) / 2 + unshimmed[:, :, (3 * i_n) + 2] = unshimmed[:, :, 0] ** 2 + + nii_fmap = nib.Nifti1Image(unshimmed, create_unshimmed_affine()) + + return nii_fmap + + +def create_unshimmed_affine(): + # return np.array([[0., 0., 3., 1], + # [-2.91667008, 0., 0., 2], + # [0., 2.91667008, 0., 3], + # [0., 0., 0., 1.]]) + return np.eye(4) + + +def create_constraints(max_coef, min_coef, sum_coef, n_channels=8): + # Set up bounds for output currents + bounds = [] + for _ in range(n_channels): + bounds.append((min_coef, max_coef)) + + constraints = { + "coef_sum_max": sum_coef, + "coef_channel_minmax": bounds + } + return constraints + + +def create_coil(n_x, n_y, n_z, constraints, coil_affine, n_channel=8): + # Set up spherical harmonics coil profile + mesh_x, mesh_y, mesh_z = generate_meshgrid((n_x, n_y, n_z), coil_affine) + profiles = siemens_basis(mesh_x, mesh_y, mesh_z) + + # Define coil1 + coil = Coil(profiles[..., :n_channel], coil_affine, constraints) + return coil + + +nz = 3 # Must be multiple of 3 +nii_to_shim = create_fieldmap() + +# Create coil profiles +unshimmed_affine = create_unshimmed_affine() +coil_affine = unshimmed_affine * 2 +coil_affine[3, 3] = 1 +# Coil with same #of pixel and same affine as fieldmap +coil1 = create_coil(100, 100, nz, create_constraints(1000, -1000, 2000), unshimmed_affine) +affine = coil_affine * 0.75 +affine[3, 3] = 1 +# Coil with different affine and different # of pixels +coil2 = create_coil(150, 120, nz + 10, create_constraints(500, -500, 1500), affine) + +# Create anat +anat = np.ones((50, 50, 3)) +nii_anat = nib.Nifti1Image(anat, affine=affine) + +# Create mask +static_mask = shapes(anat, 'cube', len_dim1=10, len_dim2=10, len_dim3=nz) +nii_mask = nib.Nifti1Image(static_mask.astype(int), nii_anat.affine, header=nii_anat.header) + + +@pytest.mark.parametrize( + "nii_fieldmap,nii_anat,nii_mask,sph_coil,sph_coil2", [( + nii_to_shim, + nii_anat, + nii_mask, + coil1, + coil2, + )] +) +class TestSequencer(object): + """Tests for shim_sequencer""" + def test_shim_sequencer_lsq(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + # Optimize + slices = define_slices(nii_anat.shape[2], 1) + + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil], currents, slices) + + def test_shim_sequencer_pseudo(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + # Optimize + slices = define_slices(nii_anat.shape[2], 1) + + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method='pseudo_inverse') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil], currents, slices) + + def test_shim_sequencer_2_coils_lsq(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + # Optimize + slices = define_slices(nii_anat.shape[2], 1) + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil, sph_coil2], + method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil, sph_coil2], currents, slices) + + def test_shim_sequencer_coefs_are_none(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + # Coil with None constraints + coil = create_coil(5, 5, nz, create_constraints(None, None, None), affine) + + # Optimize + slices = define_slices(nii_anat.shape[2], 1) + + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [coil], method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [coil], currents, slices) + + def test_shim_sequencer_slab_slices(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + """Test for slices arranged as a slab""" + # Optimize + slices = define_slices(nii_anat.shape[2], nii_anat.shape[2]) + + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil], currents, slices) + + def test_shim_sequencer_dynamic_slices(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + """Test for slices arranged for dynamic shimming""" + # Optimize + slices = [(0,), (1,), (2,)] + + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil], currents, slices) + + def test_shim_sequencer_multi_slices(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + """Test for slices arranged for multi slice""" + # Optimize + slices = [(0, 2), (1,)] + currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method='least_squares') + + assert_results(nii_fieldmap, nii_anat, nii_mask, [sph_coil], currents, slices) + + def test_shim_sequencer_wrong_optimizer(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + + # Optimize + slices = [(0, 2), (1,)] + method = 'abc' + with pytest.raises(KeyError, match=f"Method: {method} is not part of the supported optimizers"): + shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [sph_coil], method=method) + + # TODO: tests for wrong inputs + + # def test_speed_huge_matrix(self, nii_fieldmap, nii_anat, nii_mask, sph_coil, sph_coil2): + # # Create 1 huge coil which essentially is siemens basis concatenated 4 times + # coils = [sph_coil, sph_coil, sph_coil, sph_coil] + # + # coil_profiles_list = [] + # + # for coil in coils: + # # Concat coils and bounds + # coil_profiles_list.append(coil.profile) + # + # coil_profiles = np.concatenate(coil_profiles_list, axis=3) + # constraints = create_constraints(1000, -1000, 5000, 32) + # + # huge_coil = Coil(coil_profiles, sph_coil.affine, constraints) + # + # slices = define_slices(nii_anat.shape[2], 1) + # currents = shim_sequencer(nii_fieldmap, nii_anat, nii_mask, slices, [huge_coil], method='least_squares') + # + # assert_results(nii_fieldmap, nii_anat, nii_mask, [huge_coil], currents, slices) + + +def assert_results(nii_fieldmap, nii_anat, nii_mask, coil, currents, slices): + # Calculate theoretical shimmed map + unshimmed = nii_fieldmap.get_fdata() + opt = Optimizer(coil, unshimmed, nii_fieldmap.affine) + + if DEBUG: + # Save fieldmap + fname_fieldmap_2 = os.path.join(os.curdir, 'fig_fieldmap.nii.gz') + nib.save(nii_fieldmap, fname_fieldmap_2) + + # Save anat + fname_anat = os.path.join(os.curdir, 'fig_anat.nii.gz') + nib.save(nii_anat, fname_anat) + + # Save anat mask + fname_mask = os.path.join(os.curdir, 'fig_anat_mask.nii.gz') + nib.save(nii_mask, fname_mask) + + # Save coil profiles as nifti + fname_coil = os.path.join(os.curdir, 'fig_coil_orig.nii.gz') + nii_coil = nib.Nifti1Image(coil[0].profile, coil[0].affine) + nib.save(nii_coil, fname_coil) + + # save resampled coil profiles + fname_coil_res = os.path.join(os.curdir, 'fig_coil_resampled.nii.gz') + nii_coil = nib.Nifti1Image(opt.merged_coils, opt.unshimmed_affine, header=nii_fieldmap.header) + nib.save(nii_coil, fname_coil_res) + + correction_per_channel = np.zeros(opt.merged_coils.shape + (len(slices),)) + shimmed = np.zeros(unshimmed.shape + (len(slices),)) + mask_fieldmap = np.zeros(unshimmed.shape + (len(slices),)) + for i_shim in range(len(slices)): + correction_per_channel[..., i_shim] = currents[i_shim] * opt.merged_coils + correction = np.sum(correction_per_channel[..., i_shim], axis=3, keepdims=False) + shimmed[..., i_shim] = unshimmed + correction + + mask_fieldmap[..., i_shim] = resample_mask(nii_mask, nii_fieldmap, slices[i_shim]).get_fdata() + + sum_shimmed = np.sum(np.abs(mask_fieldmap[..., i_shim] * shimmed[..., i_shim])) + sum_unshimmed = np.sum(np.abs(mask_fieldmap[..., i_shim] * unshimmed)) + + print(f"\nshimmed: {sum_shimmed}, unshimmed: {sum_unshimmed}, current: \n{currents[i_shim, :]}") + assert sum_shimmed <= sum_unshimmed + + if DEBUG: + # Save correction + fname_correction = os.path.join(os.curdir, 'fig_correction.nii.gz') + nii_correction = nib.Nifti1Image(correction_per_channel, opt.unshimmed_affine) + nib.save(nii_correction, fname_correction) + + # Save resampled masks + fname_res_mask = os.path.join(os.curdir, f"fig_mask_res.nii.gz") + nii_res_mask = nib.Nifti1Image(mask_fieldmap, nii_fieldmap.affine, header=nii_fieldmap.header) + nib.save(nii_res_mask, fname_res_mask) + + +def test_shim_realtime_pmu_sequencer_fake_data(): + """Test on the shim_realtime_pmu_sequencer using simulated data""" + + # anat image + fname_anat = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'anat', + 'sub-example_unshimmed_e1.nii.gz') + nii_anat = nib.load(fname_anat) + + # fake[..., 0] contains the original linear fieldmap. This repeats the linear fieldmap over the 3rd dim and scale + # down + nz = 3 + fake = create_fieldmap(n_slices=nz).get_fdata() + fake_temp = np.zeros([100, 100, nz, 4]) + lin = np.repeat(fake[:, :, 0, np.newaxis], nz, axis=2) / 10 + fake_temp[..., 0] = fake + lin + fake_temp[..., 1] = fake + fake_temp[..., 2] = fake - lin + fake_temp[..., 3] = fake + fake_affine = nii_anat.affine * 0.75 + fake_affine[3, 3] = 1 + nii_fieldmap = nib.Nifti1Image(fake_temp, fake_affine) + + # Set up mask + # static + nx, ny, nz = nii_anat.shape + static_mask = shapes(nii_anat.get_fdata(), 'cube', len_dim1=5, len_dim2=5, len_dim3=nz) + + nii_mask_static = nib.Nifti1Image(static_mask.astype(int), nii_anat.affine, header=nii_anat.header) + riro_mask = static_mask + nii_mask_riro = nib.Nifti1Image(riro_mask.astype(int), nii_anat.affine, header=nii_anat.header) + + # Pmu + fname_resp = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'PMUresp_signal.resp') + pmu = PmuResp(fname_resp) + # Change pmu so that it uses fake data. The fake data is essentially a sinusoid with 4 points + pmu.data = np.array([3000, 2000, 1000, 2000]) + pmu.stop_time_mdh = 750 + pmu.start_time_mdh = 0 + + # Define a dummy json data with the bare minimum fields and calculate the pressures pressure + json_data = {'RepetitionTime': 250 / 1000, 'AcquisitionTime': "00:00:00.000000"} + acq_timestamps = get_acquisition_times(nii_fieldmap, json_data) + acq_pressures = pmu.interp_resp_trace(acq_timestamps) + + # Create Coil + coil_affine = nii_fieldmap.affine + coil = create_coil(150, 150, nz + 10, create_constraints(np.inf, -np.inf, np.inf), coil_affine) + + # Define the slices to shim with the proper convention + slices = define_slices(nii_anat.shape[2], 1, method='sequential') + + # Find optimal currents + output = shim_realtime_pmu_sequencer(nii_fieldmap, json_data, nii_anat, nii_mask_static, nii_mask_riro, + slices, pmu, [coil], opt_method='least_squares') + currents_static, currents_riro, mean_p, p_rms = output + + currents_riro_rms = currents_riro * p_rms + + print(f"\nSlices: {slices}" + f"\nFieldmap affine:\n{nii_fieldmap.affine}\n" + f"Coil affine:\n{coil_affine}\n" + f"Static currents:\n{currents_static}\n" + f"Riro currents * p_rms:\n{currents_riro_rms}\n") + + # Calculate theoretical shimmed map + # shim + unshimmed = nii_fieldmap.get_fdata() + nii_target = nib.Nifti1Image(nii_fieldmap.get_fdata()[..., 0], nii_fieldmap.affine, header=nii_fieldmap.header) + opt = Optimizer([coil], unshimmed[..., 0], nii_fieldmap.affine) + shape = unshimmed.shape + (len(slices),) + shimmed_static_riro = np.zeros(shape) + shimmed_static = np.zeros(shape) + shimmed_riro = np.zeros(shape) + masked_shim_static_riro = np.zeros(shape) + masked_shim_static = np.zeros(shape) + masked_shim_riro = np.zeros(shape) + masked_unshimmed = np.zeros(shape) + masked_fieldmap = np.zeros(unshimmed[..., 0].shape + (len(slices),)) + shim_trace_static_riro = [] + shim_trace_static = [] + shim_trace_riro = [] + unshimmed_trace = [] + for i_shim in range(len(slices)): + # Calculate static correction + correction_static = np.sum(currents_static[i_shim] * opt.merged_coils, axis=3, keepdims=False) + + # Calculate the riro coil profiles + riro_profile = np.sum(currents_riro[i_shim] * opt.merged_coils, axis=3, keepdims=False) + + masked_fieldmap[..., i_shim] = resample_mask(nii_mask_static, nii_target, slices[i_shim]).get_fdata() + for i_t in range(nii_fieldmap.shape[3]): + # Apply the static and riro correction + correction_riro = riro_profile * (acq_pressures[i_t] - mean_p) + shimmed_static[..., i_t, i_shim] = unshimmed[..., i_t] + correction_static + shimmed_static_riro[..., i_t, i_shim] = shimmed_static[..., i_t, i_shim] + correction_riro + shimmed_riro[..., i_t, i_shim] = unshimmed[..., i_t] + correction_riro + + # Calculate masked shim + masked_shim_static[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_static[..., i_t, i_shim] + masked_shim_static_riro[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_static_riro[..., i_t, i_shim] + masked_shim_riro[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_riro[..., i_t, i_shim] + masked_unshimmed[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * unshimmed[..., i_t] + + # Calculate the sum over the ROI + sum_shimmed_static = np.sum(np.abs(masked_shim_static[..., i_t, i_shim])) + sum_shimmed_static_riro = np.sum(np.abs(masked_shim_static_riro[..., i_t, i_shim])) + sum_shimmed_riro = np.sum(np.abs(masked_shim_riro[..., i_t, i_shim])) + sum_unshimmed = np.sum(np.abs(masked_unshimmed[..., i_t, i_shim])) + print(f"\ni_shim: {i_shim}, t: {i_t}" + f"\nshimmed static: {sum_shimmed_static}, shimmed static+riro: {sum_shimmed_static_riro}, " + f"unshimmed: {sum_unshimmed}\n" + f"Static currents:\n{currents_static[i_shim]}\n" + f"Riro currents:\n{currents_riro[i_shim] * (acq_pressures[i_t] - mean_p)}\n") + + # Create a 1D list of the sum of the shimmed and unshimmed maps + shim_trace_static.append(sum_shimmed_static) + shim_trace_static_riro.append(sum_shimmed_static_riro) + shim_trace_riro.append(sum_shimmed_riro) + unshimmed_trace.append(sum_unshimmed) + + assert sum_shimmed_static_riro <= sum_unshimmed + + if DEBUG: + + # reshape to slice x timepoint + nt = unshimmed.shape[3] + n_shim = len(slices) + shim_trace_static = np.array(shim_trace_static).reshape(n_shim, nt) + shim_trace_static_riro = np.array(shim_trace_static_riro).reshape(n_shim, nt) + shim_trace_riro = np.array(shim_trace_riro).reshape(n_shim, nt) + unshimmed_trace = np.array(unshimmed_trace).reshape(n_shim, nt) + + # Plot and save debug outputs + i_slice = 0 + i_shim = 0 + i_t = 0 + plot_static_riro(masked_unshimmed, masked_shim_static, masked_shim_static_riro, unshimmed, shimmed_static, + shimmed_static_riro, i_slice=i_slice, i_shim=i_shim, i_t=i_t) + plot_currents(currents_static, currents_riro_rms) + plot_shimmed_trace(unshimmed_trace, shim_trace_static, shim_trace_riro, shim_trace_static_riro) + plot_pressure_points(acq_pressures) + save_nii(nii_fieldmap, coil, opt, nii_mask_static) + print_rt_metrics(unshimmed, shimmed_static, shimmed_static_riro, shimmed_riro, masked_fieldmap) + + +def test_shim_realtime_pmu_sequencer_rt_zshim_data(): + # Fieldmap + fname_fieldmap = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'fmap', + 'sub-example_fieldmap.nii.gz') + nii_fieldmap = nib.load(fname_fieldmap) + + # anat image + fname_anat = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'anat', + 'sub-example_unshimmed_e1.nii.gz') + nii_anat = nib.load(fname_anat) + + # Set up mask + # static + nx, ny, nz = nii_anat.shape + static_mask = shapes(nii_anat.get_fdata(), 'cube', len_dim1=5, len_dim2=5, len_dim3=nz) + + nii_mask_static = nib.Nifti1Image(static_mask.astype(int), nii_anat.affine, header=nii_anat.header) + riro_mask = static_mask + nii_mask_riro = nib.Nifti1Image(riro_mask.astype(int), nii_anat.affine, header=nii_anat.header) + + # Pmu + fname_resp = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'PMUresp_signal.resp') + pmu = PmuResp(fname_resp) + + # Path for json file + fname_json = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'fmap', + 'sub-example_magnitude1.json') + with open(fname_json) as json_file: + json_data = json.load(json_file) + + # Calc pressure + acq_timestamps = get_acquisition_times(nii_fieldmap, json_data) + acq_pressures = pmu.interp_resp_trace(acq_timestamps) + + # Create Coil + coil_affine = nii_fieldmap.affine + coil = create_coil(150, 150, nz + 10, create_constraints(np.inf, -np.inf, np.inf), coil_affine) + + # Define the slices to shim with the proper convention + slices = define_slices(nii_anat.shape[2], 5, method='sequential') + + # Find optimal currents + output = shim_realtime_pmu_sequencer(nii_fieldmap, json_data, nii_anat, nii_mask_static, nii_mask_riro, slices, pmu, + [coil], opt_method='least_squares') + currents_static, currents_riro, mean_p, p_rms = output + + # Scale according to rms + currents_riro_rms = currents_riro * p_rms + + # Print some outputs + print(f"\nSlices: {slices}" + f"\nFieldmap affine:\n{nii_fieldmap.affine}\n" + f"Coil affine:\n{coil_affine}\n" + f"Static currents:\n{currents_static}\n" + f"Riro currents * p_rms:\n{currents_riro_rms}\n") + + # Calculate theoretical shimmed map + # shim + unshimmed = nii_fieldmap.get_fdata() + nii_target = nib.Nifti1Image(nii_fieldmap.get_fdata()[..., 0], nii_fieldmap.affine, header=nii_fieldmap.header) + opt = Optimizer([coil], unshimmed[..., 0], nii_fieldmap.affine) + shape = unshimmed.shape + (len(slices),) + shimmed_static_riro = np.zeros(shape) + shimmed_static = np.zeros(shape) + shimmed_riro = np.zeros(shape) + masked_shim_static_riro = np.zeros(shape) + masked_shim_static = np.zeros(shape) + masked_shim_riro = np.zeros(shape) + masked_unshimmed = np.zeros(shape) + masked_fieldmap = np.zeros(unshimmed[..., 0].shape + (len(slices),)) + shim_trace_static_riro = [] + shim_trace_static = [] + shim_trace_riro = [] + unshimmed_trace = [] + for i_shim in range(len(slices)): + # Calculate static correction + correction_static = np.sum(currents_static[i_shim] * opt.merged_coils, axis=3, keepdims=False) + + # Calculate the riro coil profiles + riro_profile = np.sum(currents_riro[i_shim] * opt.merged_coils, axis=3, keepdims=False) + + masked_fieldmap[..., i_shim] = resample_mask(nii_mask_static, nii_target, slices[i_shim]).get_fdata() + for i_t in range(nii_fieldmap.shape[3]): + # Apply the static and riro correction + correction_riro = riro_profile * (acq_pressures[i_t] - mean_p) + shimmed_static[..., i_t, i_shim] = unshimmed[..., i_t] + correction_static + shimmed_static_riro[..., i_t, i_shim] = shimmed_static[..., i_t, i_shim] + correction_riro + shimmed_riro[..., i_t, i_shim] = unshimmed[..., i_t] + correction_riro + + # Calculate masked shim + masked_shim_static[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_static[..., i_t, i_shim] + masked_shim_static_riro[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_static_riro[..., i_t, i_shim] + masked_shim_riro[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * shimmed_riro[..., i_t, i_shim] + masked_unshimmed[..., i_t, i_shim] = masked_fieldmap[..., i_shim] * unshimmed[..., i_t] + + # Calculate the sum over the ROI + sum_shimmed_static = np.sum(np.abs(masked_shim_static[..., i_t, i_shim])) + sum_shimmed_static_riro = np.sum(np.abs(masked_shim_static_riro[..., i_t, i_shim])) + sum_shimmed_riro = np.sum(np.abs(masked_shim_riro[..., i_t, i_shim])) + sum_unshimmed = np.sum(np.abs(masked_unshimmed[..., i_t, i_shim])) + print(f"\ni_shim: {i_shim}, t: {i_t}" + f"\nshimmed static: {sum_shimmed_static}, shimmed static+riro: {sum_shimmed_static_riro}, " + f"unshimmed: {sum_unshimmed}\n" + f"Static currents:\n{currents_static[i_shim]}\n" + f"Riro currents:\n{currents_riro[i_shim] * (acq_pressures[i_t] - mean_p)}\n") + + # Create a 1D list of the sum of the shimmed and unshimmed maps + shim_trace_static.append(sum_shimmed_static) + shim_trace_static_riro.append(sum_shimmed_static_riro) + shim_trace_riro.append(sum_shimmed_riro) + unshimmed_trace.append(sum_unshimmed) + + assert sum_shimmed_static_riro < sum_unshimmed + + if DEBUG: + # reshape to slice x timepoint + nt = unshimmed.shape[3] + n_shim = len(slices) + shim_trace_static = np.array(shim_trace_static).reshape(n_shim, nt) + shim_trace_static_riro = np.array(shim_trace_static_riro).reshape(n_shim, nt) + shim_trace_riro = np.array(shim_trace_riro).reshape(n_shim, nt) + unshimmed_trace = np.array(unshimmed_trace).reshape(n_shim, nt) + + i_slice = 0 + i_shim = 0 + i_t = 0 + plot_static_riro(masked_unshimmed, masked_shim_static, masked_shim_static_riro, unshimmed, shimmed_static, + shimmed_static_riro, i_slice=i_slice, i_shim=i_shim, i_t=i_t) + plot_currents(currents_static, currents_riro_rms) + plot_shimmed_trace(unshimmed_trace, shim_trace_static, shim_trace_riro, shim_trace_static_riro) + plot_pressure_points(acq_pressures) + save_nii(nii_fieldmap, coil, opt, nii_mask_static) + print_rt_metrics(unshimmed, shimmed_static, shimmed_static_riro, shimmed_riro, masked_fieldmap) + + +def plot_shimmed_trace(unshimmed_trace, shim_trace_static, shim_trace_riro, shim_trace_static_riro): + """plot shimmed and unshimmed sum over the roi for each shim""" + + min_value = min( + shim_trace_static_riro[:, :].min(), + shim_trace_static[:, :].min(), + shim_trace_riro[:, :].min(), + unshimmed_trace[:, :].min() + ) + max_value = max( + shim_trace_static_riro[:, :].max(), + shim_trace_static[:, :].max(), + shim_trace_riro[:, :].max(), + unshimmed_trace[:, :].max() + ) + + fig = Figure(figsize=(10, 50)) + n_shim = len(unshimmed_trace) + for i_shim in range(n_shim): + ax = fig.add_subplot(n_shim, 1, i_shim + 1) + ax.plot(shim_trace_static_riro[i_shim, :], label='shimmed static + riro') + ax.plot(shim_trace_static[i_shim, :], label='shimmed static') + ax.plot(shim_trace_riro[i_shim, :], label='shimmed_riro') + ax.plot(unshimmed_trace[i_shim, :], label='unshimmed') + ax.set_xlabel('Timepoints') + ax.set_ylabel('Sum over the ROI') + ax.legend() + ax.set_ylim(min_value, max_value) + ax.set_title(f"Unshimmed vs shimmed values: slice {i_shim}") + fname_figure = os.path.join(os.curdir, 'fig_trace_shimmed_vs_unshimmed.png') + fig.savefig(fname_figure) + + +def plot_static_riro(masked_unshimmed, masked_shim_static, masked_shim_static_riro, unshimmed, shimmed_static, + shimmed_static_riro, i_t=0, i_slice=0, i_shim=0): + """Plot Static and RIRO fieldmap for a perticular fieldmap slice, anat shim and timepoint""" + + min_value = min(masked_shim_static_riro[..., i_slice, i_t, i_shim].min(), + masked_shim_static[..., i_slice, i_t, i_shim].min(), + masked_unshimmed[..., i_slice, i_t, i_shim].min()) + max_value = max(masked_shim_static_riro[..., i_slice, i_t, i_shim].max(), + masked_shim_static[..., i_slice, i_t, i_shim].max(), + masked_unshimmed[..., i_slice, i_t, i_shim].max()) + + fig = Figure(figsize=(10, 10)) + ax = fig.add_subplot(2, 3, 1) + im = ax.imshow(np.rot90(masked_shim_static_riro[..., i_slice, i_t, i_shim]), vmin=min_value, vmax=max_value) + fig.colorbar(im) + ax.set_title("masked_shim static + riro") + ax = fig.add_subplot(2, 3, 2) + im = ax.imshow(np.rot90(masked_shim_static[..., i_slice, i_t, i_shim]), vmin=min_value, vmax=max_value) + fig.colorbar(im) + ax.set_title("masked_shim static") + ax = fig.add_subplot(2, 3, 3) + im = ax.imshow(np.rot90(masked_unshimmed[..., i_slice, i_t, i_shim]), vmin=min_value, vmax=max_value) + fig.colorbar(im) + ax.set_title("masked_unshimmed") + + ax = fig.add_subplot(2, 3, 4) + im = ax.imshow(np.rot90(shimmed_static_riro[..., i_slice, i_t, i_shim])) + fig.colorbar(im) + ax.set_title("shim static + riro") + ax = fig.add_subplot(2, 3, 5) + im = ax.imshow(np.rot90(shimmed_static[..., i_slice, i_t, i_shim])) + fig.colorbar(im) + ax.set_title(f"shim static: shim:{i_shim}") + ax = fig.add_subplot(2, 3, 6) + im = ax.imshow(np.rot90(unshimmed[..., i_slice, i_t])) + fig.colorbar(im) + ax.set_title(f"unshimmed slice: {i_slice}, timepoint: {i_t}") + fname_figure = os.path.join(os.curdir, 'fig_realtime_masked_shimmed_vs_unshimmed.png') + fig.savefig(fname_figure) + + +def plot_currents(static, riro=None): + """Plot evolution of currents through shims""" + fig = Figure(figsize=(10, 10)) + ax = fig.add_subplot(111) + ax.plot(static[:, 0], label='Static dim0 currents through shims') + ax.plot(static[:, 1], label='Static dim1 currents through shims') + ax.plot(static[:, 2], label='Static dim2 currents through shims') + if riro is not None: + ax.plot(riro[:, 0], label='Riro dim0 currents through shims') + ax.plot(riro[:, 1], label='Riro dim1 currents through shims') + ax.plot(riro[:, 2], label='Riro dim2 currents through shims') + ax.set_xlabel('i_shims') + ax.set_ylabel('Currrents') + ax.legend() + ax.set_title("Currents through shims") + fname_figure = os.path.join(os.curdir, 'fig_currents.png') + fig.savefig(fname_figure) + + +def plot_pressure_points(acq_pressures): + """Plot respiratory trace pressure points""" + fig = Figure(figsize=(10, 10)) + ax = fig.add_subplot(111) + ax.plot(acq_pressures, label='pressures') + ax.legend() + ax.set_ylim(0, 4095) + ax.set_title("Pressures vs time points") + fname_figure = os.path.join(os.curdir, 'fig_trace_pressures.png') + fig.savefig(fname_figure) + + +def save_nii(nii_fieldmap, coil, opt, nii_mask): + """Save relevant nifti files""" + # save mask + fname_mask = os.path.join(os.curdir, 'fig_mask.nii.gz') + nib.save(nii_mask, fname_mask) + + # Save fieldmap + fname_fieldmap_2 = os.path.join(os.curdir, 'fig_fieldmap.nii.gz') + nib.save(nii_fieldmap, fname_fieldmap_2) + + # Save coil profiles as nifti + fname_coil = os.path.join(os.curdir, 'fig_coil_orig.nii.gz') + nii_coil = nib.Nifti1Image(coil.profile, coil.affine) + nib.save(nii_coil, fname_coil) + + # save resampled coil profiles + fname_coil_res = os.path.join(os.curdir, 'fig_coil_resampled.nii.gz') + nii_coil = nib.Nifti1Image(opt.merged_coils, opt.unshimmed_affine) + nib.save(nii_coil, fname_coil_res) + + +def print_rt_metrics(unshimmed, shimmed_static, shimmed_static_riro, shimmed_riro, masked_fieldmap): + """Print to the console metrics about the realtime and static shim. These metrics isolate temporal and static + components + Temporal: Compute the STD across time pixelwise, and then compute the mean across pixels. + Static: Compute the MEAN across time pixelwise, and then compute the STD across pixels. + """ + + unshimmed_repeat = np.repeat(unshimmed[..., np.newaxis], masked_fieldmap.shape[-1], axis=-1) + mask_repeats = np.repeat(masked_fieldmap[:, :, :, np.newaxis, :], unshimmed.shape[3], axis=3) + ma_unshimmed = np.ma.array(unshimmed_repeat, mask=mask_repeats == False) + ma_shim_static = np.ma.array(shimmed_static, mask=mask_repeats == False) + ma_shim_static_riro = np.ma.array(shimmed_static_riro, mask=mask_repeats == False) + ma_shim_riro = np.ma.array(shimmed_riro, mask=mask_repeats == False) + + # Temporal + temp_shim_static = np.ma.mean(np.ma.std(ma_shim_static, 3)) + temp_shim_static_riro = np.ma.mean(np.ma.std(ma_shim_static_riro, 3)) + temp_shim_riro = np.ma.mean(np.ma.std(ma_shim_riro, 3)) + temp_unshimmed = np.ma.mean(np.ma.std(ma_unshimmed, 3)) + + # Static + static_shim_static = np.ma.std(np.ma.mean(ma_shim_static, 3)) + static_shim_static_riro = np.ma.std(np.ma.mean(ma_shim_static_riro, 3)) + static_shim_riro = np.ma.std(np.ma.mean(ma_shim_riro, 3)) + static_unshimmed = np.ma.std(np.ma.mean(ma_unshimmed, 3)) + print(f"\nTemporal: Compute the STD across time pixelwise, and then compute the mean across pixels." + f"\ntemp_shim_static: {temp_shim_static}" + f"\ntemp_shim_static_riro: {temp_shim_static_riro}" + f"\ntemp_shim_riro: {temp_shim_riro}" + f"\ntemp_unshimmed: {temp_unshimmed}" + f"\nStatic: Compute the MEAN across time pixelwise, and then compute the STD across pixels." + f"\nstatic_shim_static: {static_shim_static}" + f"\nstatic_shim_static_riro: {static_shim_static_riro}" + f"\nstatic_shim_riro: {static_shim_riro}" + f"\nstatic_unshimmed: {static_unshimmed}") diff --git a/test/test_pmu.py b/test/test_pmu.py index 11c095629..b0f2ee01f 100644 --- a/test/test_pmu.py +++ b/test/test_pmu.py @@ -133,3 +133,27 @@ def test_timing_images(): # # fname_figure = os.path.join(__dir_shimmingtoolbox__, 'mask.png') # fig.savefig(fname_figure) + + +def test_pmu_fake_data(): + + # Get pressure values + fname_pmu = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'PMUresp_signal.resp') + pmu = PmuResp(fname_pmu) + + pmu.data = np.array([3000, 2000, 1000, 2000, 3000, 2000, 1000, 2000, 3000, 2000]) + pmu.stop_time_mdh = 250 * (len(pmu.data) - 1) + pmu.start_time_mdh = 0 + + json_data = {'RepetitionTime': 250 / 1000, 'AcquisitionTime': "00:00:00.000000"} + + # Fieldmap + fname_fieldmap = os.path.join(__dir_testing__, 'realtime_zshimming_data', 'nifti', 'sub-example', 'fmap', + 'sub-example_fieldmap.nii.gz') + nii_fieldmap = nib.load(fname_fieldmap) + + # Calc pressure + acq_timestamps = get_acquisition_times(nii_fieldmap, json_data) + acq_pressures = pmu.interp_resp_trace(acq_timestamps) + + assert np.all(acq_pressures == pmu.data) diff --git a/test/test_sequential_zslice.py b/test/test_sequential_zslice.py deleted file mode 100644 index c31f26a9c..000000000 --- a/test/test_sequential_zslice.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -import numpy as np -import pytest - -from shimmingtoolbox.optimizer.sequential import sequential_zslice -from shimmingtoolbox.coils.siemens_basis import siemens_basis -from shimmingtoolbox.simulate.numerical_model import NumericalModel -from shimmingtoolbox.masking.shapes import shapes -from shimmingtoolbox.coils.coil import Coil -from shimmingtoolbox.optimizer.basic_optimizer import Optimizer -from shimmingtoolbox.coils.coordinates import generate_meshgrid - - -def create_unshimmed(): - # Set up 2-dimensional unshimmed fieldmaps - num_vox = 100 - model_obj = NumericalModel('shepp-logan', num_vox=num_vox) - model_obj.generate_deltaB0('linear', [0.0, 20]) - tr = 0.025 # in s - te = [0.004, 0.008] # in s - model_obj.simulate_measurement(tr, te) - phase_meas1 = model_obj.get_phase() - phase_e1 = phase_meas1[:, :, 0, 0] - phase_e2 = phase_meas1[:, :, 0, 1] - b0_map = ((phase_e2 - phase_e1) / (te[1] - te[0])) / (2 * np.pi) - - # Construct a 3-dimensional synthetic field map by stacking different z-slices along the 3rd dimension. Each - # slice is subjected to a manipulation of model_obj across slices (e.g. rotation, squared) in order to test - # various shim configurations. - unshimmed = np.zeros([num_vox, num_vox, nz]) - for i_n in range(nz // 3): - unshimmed[:, :, 3 * i_n] = b0_map - unshimmed[:, :, (3 * i_n) + 1] = (np.rot90(unshimmed[:, :, 0]) + unshimmed[:, :, 0]) / 2 - unshimmed[:, :, (3 * i_n) + 2] = unshimmed[:, :, 0] ** 2 - - return unshimmed - - -def create_unshimmed_affine(): - return np.array([[0., 0., 3., 1], - [-2.91667008, 0., 0., 2], - [0., 2.91667008, 0., 3], - [0., 0., 0., 1.]]) - - -def create_constraints(max_coef, min_coef, sum_coef, n_channels=8): - # Set up bounds for output currents - bounds = [] - for _ in range(n_channels): - bounds.append((min_coef, max_coef)) - - constraints = { - "coef_sum_max": sum_coef, - "coef_channel_minmax": bounds - } - return constraints - - -def create_coil(n_x, n_y, n_z, constraints, coil_affine): - # Set up spherical harmonics coil profile - mesh_x, mesh_y, mesh_z = generate_meshgrid((n_x, n_y, n_z), coil_affine) - profiles = siemens_basis(mesh_x, mesh_y, mesh_z) - - # Define coil1 - coil = Coil(profiles, coil_affine, constraints) - return coil - - -def create_mask(ref): - # Set up mask - mask = shapes(ref, 'cube', len_dim1=40, len_dim2=40, len_dim3=nz) - return mask - - -nz = 3 # Must be multiple of 3 -a_unshimmed = create_unshimmed() -affine = np.eye(4) * 2 -affine[3, 3] = 1 -coil1 = create_coil(150, 150, nz + 10, create_constraints(1000, -1000, 2000), affine) -affine = np.eye(4) * 0.75 -affine[3, 3] = 1 -coil2 = create_coil(150, 120, nz + 10, create_constraints(500, -500, 1500), affine) - - -@pytest.mark.parametrize( - "unshimmed,un_affine,sph_coil,sph_coil2,mask", [( - a_unshimmed, - create_unshimmed_affine(), - coil1, - coil2, - create_mask(a_unshimmed) - ) - ] -) -class TestSequentialZSlice(object): - def test_zslice_lsq(self, unshimmed, un_affine, sph_coil, sph_coil2, mask): - # Optimize - z_slices = np.array(range(unshimmed.shape[2])) - currents = sequential_zslice(unshimmed, un_affine, [sph_coil], mask, z_slices, method='least_squares') - - assert_results([sph_coil], unshimmed, un_affine, currents, mask, z_slices) - - def test_zslice_pseudo(self, unshimmed, un_affine, sph_coil, sph_coil2, mask): - # Optimize - z_slices = np.array(range(unshimmed.shape[2])) - currents = sequential_zslice(unshimmed, un_affine, [sph_coil], mask, z_slices, - method='pseudo_inverse') - - assert_results([sph_coil], unshimmed, un_affine, currents, mask, z_slices) - - def test_zslice_2_coils_lsq(self, unshimmed, un_affine, sph_coil, sph_coil2, mask): - - # Optimize - z_slices = np.array(range(unshimmed.shape[2])) - currents = sequential_zslice(unshimmed, un_affine, [sph_coil, sph_coil2], mask, - z_slices, method='least_squares') - - assert_results([sph_coil, sph_coil2], unshimmed, un_affine, currents, mask, z_slices) - - def test_coefs_are_none(self, unshimmed, un_affine, sph_coil, sph_coil2, mask): - - coil = create_coil(5, 5, nz, create_constraints(None, None, None), affine) - - # Optimize - z_slices = np.array(range(unshimmed.shape[2])) - currents = sequential_zslice(unshimmed, un_affine, [coil], mask, z_slices, method='least_squares') - - assert_results([coil], unshimmed, un_affine, currents, mask, z_slices) - - # def test_speed_huge_matrix(self, unshimmed, un_affine, sph_coil, sph_coil2, mask): - # # Create 1 huge coil which essentially is siemens basis concatenated 4 times - # coils = [sph_coil, sph_coil, sph_coil, sph_coil] - # - # coil_profiles_list = [] - # - # for coil in coils: - # # Concat coils and bounds - # coil_profiles_list.append(coil.profile) - # - # coil_profiles = np.concatenate(coil_profiles_list, axis=3) - # constraints = create_constraints(1000, 2000, 32) - # - # huge_coil = Coil(coil_profiles, sph_coil.affine, constraints) - # - # z_slices = np.array(range(unshimmed.shape[2])) - # currents = sequential_zslice(unshimmed, un_affine, [huge_coil], mask, z_slices) - # - # # Calculate theoretical shimmed map - # opt = Optimizer([huge_coil], unshimmed, un_affine) - # shimmed = unshimmed + np.sum(currents * opt.merged_coils, axis=3, keepdims=False) - # - # for i_slice in z_slices: - # sum_shimmed = np.sum(np.abs(mask[:, :, i_slice] * shimmed[:, :, i_slice])) - # sum_unshimmed = np.sum(np.abs(mask[:, :, i_slice] * unshimmed[:, :, i_slice])) - # # print(f"\nshimmed: {sum_shimmed}, unshimmed: {sum_unshimmed}, current: \n{currents[i_slice, :]}") - # assert sum_shimmed <= sum_unshimmed - - -def assert_results(coil, unshimmed, un_affine, currents, mask, z_slices): - # Calculate theoretical shimmed map - opt = Optimizer(coil, unshimmed, un_affine) - shimmed = unshimmed + np.sum(currents * opt.merged_coils, axis=3, keepdims=False) - - for i_slice in z_slices: - sum_shimmed = np.sum(np.abs(mask[:, :, i_slice] * shimmed[:, :, i_slice])) - sum_unshimmed = np.sum(np.abs(mask[:, :, i_slice] * unshimmed[:, :, i_slice])) - print(f"\nshimmed: {sum_shimmed}, unshimmed: {sum_unshimmed}, current: \n{currents[i_slice, :]}") - assert sum_shimmed < sum_unshimmed