# An investigation of PyLinac's MV kV iso code

From https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/winston_lutz.py

PyLinac has the following license:

Modifications have been made to the PyLinac code. Please see the original repository (https://github.com/jrkerns/pylinac) for the original code.

In [None]:
import os
import datetime
from glob import glob
from collections import namedtuple

import numpy as np
import pandas as pd
import scipy.ndimage

import pydicom
from pylinac import WinstonLutz

In [None]:
data_root = r'S:\Physics\Programming\data\MVISO'

In [None]:
data_record = glob(os.path.join(data_root, 'iView*.xlsx'))[0]
dicom_files = np.array(glob(os.path.join(data_root, '*.dcm')))

In [None]:
record = pd.read_excel(data_record, skiprows=4)
timestamps_initial = record['Datetime']
timestamps = timestamps_initial[timestamps_initial.notnull()].values
gantry = record['Gantry'][timestamps_initial.notnull()].values
colimator = record['Col'][timestamps_initial.notnull()].values
turntable = record['TT'][timestamps_initial.notnull()].values
beam = record['Energy'][timestamps_initial.notnull()].values

In [None]:
datasets = np.array([
    pydicom.read_file(dicom_file, force=True)
    for dicom_file in dicom_files
])

In [None]:
# np.random.shuffle(datasets)

In [None]:
acquisition_datetimes = np.array([
    datetime.datetime.strptime(dataset.AcquisitionDate + dataset.AcquisitionTime, '%Y%m%d%H%M%S.%f')
    for dataset in datasets
], dtype=np.datetime64)

In [None]:
diff_map = np.abs(acquisition_datetimes[None,:] - timestamps[:, None]) < np.timedelta64(2, 's')
timestamp_index, acquisition_index = np.where(diff_map)

In [None]:
assert len(set(acquisition_index)) == len(acquisition_index)
assert len(acquisition_index) == len(acquisition_datetimes)

In [None]:
datasets = datasets[acquisition_index]
dicom_files = dicom_files[acquisition_index]
timestamps = timestamps[timestamp_index]
gantry = gantry[timestamp_index]
colimator = colimator[timestamp_index]
turntable = turntable[timestamp_index]
beam = beam[timestamp_index]

acquisition_datetimes = np.array([
    datetime.datetime.strptime(dataset.AcquisitionDate + dataset.AcquisitionTime, '%Y%m%d%H%M%S.%f')
    for dataset in datasets
], dtype=np.datetime64)

diff_map = np.abs(acquisition_datetimes[None,:] - timestamps[:, None]) < np.timedelta64(2, 's')
timestamp_index, acquisition_index = np.where(diff_map)

assert np.all(timestamp_index == acquisition_index)

In [None]:
pixel_arrays = [
    dataset.pixel_array
    for dataset in datasets
]

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/image.py#L358-L377
    
def crop(pixel_array, pixels):    
    pixel_array = pixel_array[pixels:, :]
    pixel_array = pixel_array[:-pixels, :]
    pixel_array = pixel_array[:, pixels:]
    pixel_array = pixel_array[:, :-pixels]

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/winston_lutz.py#L570-L591

def clean_edges(pixel_array, window_size):
    
    def has_noise(pixel_array, window_size):
        near_min, near_max = np.percentile(pixel_array, [5, 99.5])
        img_range = near_max - near_min
        
        top = pixel_array[:window_size, :]
        left = pixel_array[:, :window_size]
        bottom = pixel_array[-window_size:, :]
        right = pixel_array[:, -window_size:]
        
        edge_array = np.concatenate((top.flatten(), left.flatten(), bottom.flatten(), right.flatten()))
        edge_too_low = edge_array.min() < (near_min - img_range / 10)
        edge_too_high = edge_array.max() > (near_max + img_range / 10)
        
        return edge_too_low or edge_too_high

    safety_stop = np.min(pixel_array.shape)/10
    
    while has_noise(pixel_array, window_size) and safety_stop > 0:
        crop(pixel_array, window_size)
        safety_stop -= 1

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/image.py#L446-L459

def as_binary(pixel_array, threshold):
    return np.where(pixel_array >= threshold, 1, 0)

In [None]:
Point = namedtuple('x', 'y')

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/winston_lutz.py#L593-L614

def find_field_centroid(pixel_array):
    min, max = np.percentile(pixel_array, [5, 99.9])
    threshold_array = as_binary(pixel_array, (max - min)/2 + min)

    cleaned_img = scipy.ndimage.binary_erosion(threshold_array)
    [*edges] = bounding_box(cleaned_img)
    edges[0] -= 10
    edges[1] += 10
    edges[2] -= 10
    edges[3] += 10
    coords = scipy.ndimage.measurements.center_of_mass(threshold_img)
    p = Point(x=coords[-1], y=coords[0])

    return p, edges

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L931-L1089

def peak_detect(values: np.ndarray, threshold: Union[float, int]=None, min_distance: Union[float, int]=10,
                max_number: int=None, search_region: Tuple[float, float]=(0.0, 1.0),
                find_min_instead: bool=False) -> Tuple[np.ndarray, np.ndarray]:
    """Find the peaks or valleys of a 1D signal.
    Uses the difference (np.diff) in signal to find peaks. Current limitations include:
        1) Only for use in 1-D data; 2D may be possible with the gradient function.
        2) Will not detect peaks at the very edge of array (i.e. 0 or -1 index)
    Parameters
    ----------
    values : array-like
        Signal values to search for peaks within.
    threshold : int, float
        The value the peak must be above to be considered a peak. This removes "peaks"
        that are in a low-value region.
        If passed an int, the actual value is the threshold.
        E.g. when passed 15, any peak less with a value <15 is removed.
        If passed a float, it will threshold as a percent. Must be between 0 and 1.
        E.g. when passed 0.4, any peak <40% of the maximum value will be removed.
    min_distance : int, float
        If passed an int, parameter is the number of elements apart a peak must be from neighboring peaks.
        If passed a float, must be between 0 and 1 and represents the ratio of the profile to exclude.
        E.g. if passed 0.05 with a 1000-element profile, the minimum peak width will be 0.05*1000 = 50 elements.
    max_number : int
        Specify up to how many peaks will be returned. E.g. if 3 is passed in and 5 peaks are found, only the 3 largest
        peaks will be returned.
    find_min_instead : bool
        If False (default), peaks will be returned.
        If True, valleys will be returned.
    Returns
    -------
    max_vals : numpy.array
        The values of the peaks found.
    max_idxs : numpy.array
        The x-indices (locations) of the peaks.
    Raises
    ------
    ValueError
        If float not between 0 and 1 passed to threshold.
    """
    peak_vals = []  # a list to hold the y-values of the peaks. Will be converted to a numpy array
    peak_idxs = []  # ditto for x-values (index) of y data.

    if find_min_instead:
        values = -values

    """Limit search to search region"""
    left_end = search_region[0]
    if is_float_like(left_end):
        left_index = int(left_end*len(values))
    elif is_int_like(left_end):
        left_index = left_end
    else:
        raise ValueError(f"{left_end} must be a float or int")

    right_end = search_region[1]
    if is_float_like(right_end):
        right_index = int(right_end * len(values))
    elif is_int_like(right_end):
        right_index = right_end
    else:
        raise ValueError(f"{right_end} must be a float or int")

    # minimum peak spacing calc
    if isinstance(min_distance, float):
        if 0 > min_distance >= 1:
            raise ValueError("When min_peak_width is passed a float, value must be between 0 and 1")
        else:
            min_distance = int(min_distance * len(values))

    values = values[left_index:right_index]

    """Determine threshold value"""
    if isinstance(threshold, float) and threshold < 1:
        data_range = values.max() - values.min()
        threshold = threshold * data_range + values.min()
    elif isinstance(threshold, float) and threshold >= 1:
        raise ValueError("When threshold is passed a float, value must be less than 1")
    elif threshold is None:
        threshold = values.min()

    """Take difference"""
    values_diff = np.diff(values.astype(float))  # y and y_diff must be converted to signed type.

    """Find all potential peaks"""
    for idx in range(len(values_diff) - 1):
        # For each item of the diff array, check if:
        # 1) The y-value is above the threshold.
        # 2) The value of y_diff is positive (negative for valley search), it means the y-value changed upward.
        # 3) The next y_diff value is zero or negative (or positive for valley search); a positive-then-negative diff value means the value
        # is a peak of some kind. If the diff is zero it could be a flat peak, which still counts.

        # 1)
        if values[idx + 1] < threshold:
            continue

        y1_gradient = values_diff[idx] > 0
        y2_gradient = values_diff[idx + 1] <= 0

        # 2) & 3)
        if y1_gradient and y2_gradient:
            # If the next value isn't zero it's a single-pixel peak. Easy enough.
            if values_diff[idx + 1] != 0:
                peak_vals.append(values[idx + 1])
                peak_idxs.append(idx + 1 + left_index)
            # elif idx >= len(y_diff) - 1:
            #     pass
            # Else if the diff value is zero, it could be a flat peak, or it could keep going up; we don't know yet.
            else:
                # Continue on until we find the next nonzero diff value.
                try:
                    shift = 0
                    while values_diff[(idx + 1) + shift] == 0:
                        shift += 1
                        if (idx + 1 + shift) >= (len(values_diff) - 1):
                            break
                    # If the next diff is negative (or positive for min), we've found a peak. Also put the peak at the center of the flat
                    # region.
                    is_a_peak = values_diff[(idx + 1) + shift] < 0
                    if is_a_peak:
                        peak_vals.append(values[int((idx + 1) + np.round(shift / 2))])
                        peak_idxs.append((idx + 1 + left_index) + np.round(shift / 2))
                except IndexError:
                    pass

    # convert to numpy arrays
    peak_vals = np.array(peak_vals)
    peak_idxs = np.array(peak_idxs)

    """Enforce the min_peak_distance by removing smaller peaks."""
    # For each peak, determine if the next peak is within the min peak width range.
    index = 0
    while index < len(peak_idxs) - 1:

        # If the second peak is closer than min_peak_distance to the first peak, find the larger peak and remove the other one.
        if peak_idxs[index] > peak_idxs[index + 1] - min_distance:
            if peak_vals[index] > peak_vals[index + 1]:
                idx2del = index + 1
            else:
                idx2del = index
            peak_vals = np.delete(peak_vals, idx2del)
            peak_idxs = np.delete(peak_idxs, idx2del)
        else:
            index += 1

    """If Maximum Number passed, return only up to number given based on a sort of peak values."""
    if max_number is not None and len(peak_idxs) > max_number:
        sorted_peak_vals = peak_vals.argsort()  # sorts low to high
        peak_vals = peak_vals[sorted_peak_vals[-max_number:]]
        peak_idxs = peak_idxs[sorted_peak_vals[-max_number:]]

    # If we were looking for minimums, convert the values back to the original sign
    if find_min_instead:
        peak_vals = -peak_vals

    return peak_vals, peak_idxs

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L221-L248

def get_initial_peak(values):
    lf_edge = 0.2
    rt_edge = 0.8
    while True:
        _, initial_peak_arr = peak_detect(values, max_number=1, search_region=(lf_edge, rt_edge))
        try:
            initial_peak = initial_peak_arr[0]
            break
        except IndexError:
            lf_edge -= 0.01
            rt_edge -= 0.01
            if lf_edge < 0:
                raise ValueError("A reasonable initial peak was not found in the profile. Ensure peak is not at profile edge")

    return initial_peak

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L309-L323

def values_left_interp(self):
    ydata_f = interp1d(self._indices, self._values_left, kind=self.interpolation_type)
    y_data = ydata_f(self._indices_interp)
    return y_data

def values_right_interp(self):
    ydata_f = interp1d(self._indices, self._values_right, kind=self.interpolation_type)
    y_data = ydata_f(self._indices_interp)
    return y_data

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L250-L307

interpolation_factor = 100

def penumbra_point(self, side: str='left', x: int=50, kind: str='index'):
    # get peak
    peak = get_initial_peak(values)
    
    search_index = int(peak * interpolation_factor)

    # get y-data
    if side == LEFT:
        y_data = self._values_left_interp
    else:
        y_data = self._values_right_interp

    # get threshold
    max_point = y_data.max()
    threshold = max_point * (x / 100)

    # find the index, moving 1 element at a time until the value is encountered
    found = False
    at_end = False
    try:
        while not found and not at_end:
            if y_data[search_index] < threshold:
                found = True
                search_index -= 1 if side == RIGHT else -1
            elif search_index == 0:
                at_end = True
            search_index += 1 if side == RIGHT else -1
    except IndexError:
        raise IndexError("The point of interest was beyond the profile; i.e. the profile may be cut off on the side")

    search_index /= interpolation_factor

    return search_index

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L343-L362

def fwxm(self, x: int=50) -> float:
    li = self._penumbra_point(LEFT, x)
    ri = self._penumbra_point(RIGHT, x)
    fwxm = np.abs(ri - li)
    return fwxm

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L364-L379
    
def fwxm_center(pixel_array, x: int=50, interpolate: bool=False, kind: str='index') -> float:
    """Return the center index of the FWXM.
    See Also
    --------
    fwxm() : Further parameter info
    """
    fwxm = self.fwxm(x, interpolate=interpolate)
    li = self._penumbra_point(LEFT, x, interpolate)
    fwxmcen = np.abs(li + fwxm / 2)
    if not interpolate:
        fwxmcen = int(round(fwxmcen))
    if kind == VALUE:
        return self.values[fwxmcen] if not interpolate else self._values_interp[int(fwxmcen*self.interpolation_factor)]
    else:
        return fwxmcen

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/image.py#L397-L400

def invert(pixel_array):
    return -pixel_array + pixel_array.max() + pixel_array.min()

In [None]:
# https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/winston_lutz.py#L616-L659

def find_bb(pixel_array):
    # get initial starting conditions
    hmin, hmax = np.percentile(pixel_array, [5, 99.9])
    spread = hmax - hmin
    max_thresh = hmax
    lower_thresh = hmax - spread / 1.5
    # search for the BB by iteratively lowering the low-pass threshold value until the BB is found.
    found = False
    while not found:
        try:
            binary_arr = np.logical_and((max_thresh > pixel_array), (pixel_array >= lower_thresh))
            labeled_arr, num_roi = ndimage.measurements.label(binary_arr)
            roi_sizes, bin_edges = np.histogram(labeled_arr, bins=num_roi + 1)
            bw_bb_img = np.where(labeled_arr == np.argsort(roi_sizes)[-3], 1, 0)

            if not is_round(bw_bb_img):
                raise ValueError
            if not is_modest_size(bw_bb_img, find_field_centroid(pixel_array)):
                raise ValueError
            if not is_symmetric(bw_bb_img):
                raise ValueError
        except (IndexError, ValueError):
            max_thresh -= 0.05 * spread
            if max_thresh < hmin:
                raise ValueError("Unable to locate the BB. Make sure the field edges do not obscure the BB and that there is no artifacts in the images.")
        else:
            found = True

    # determine the center of mass of the BB
    inv_img = invert(pixel_array)
    
    x_arr = np.abs(np.average(bw_bb_img, weights=inv_img, axis=0))
    x_com = SingleProfile(x_arr).fwxm_center(interpolate=True)
    y_arr = np.abs(np.average(bw_bb_img, weights=inv_img, axis=1))
    y_com = SingleProfile(y_arr).fwxm_center(interpolate=True)
    
    return Point(x_com, y_com)

In [None]:
diff_map

In [None]:
diff_map

In [None]:
acquisition_datetimes[29]
timestamps.values[0]

In [None]:
np.timedelta64(1, 's')

In [None]:
acquisition_times

In [None]:
np.array(timestamps.values)

## Notes

"values left" and and "values right" don't actually return the left and right values. https://github.com/jrkerns/pylinac/blob/95d3ea6b8f853beb4c9729f36b5451bbc4e7e2a7/pylinac/core/profile.py#L186-L200