In [None]:
import pandas as pd
import numpy as np
from xray_stats import load_process as lp

from PIL import Image
import os

from scipy.interpolate import interp1d
import scipy.optimize as opt
from scipy.spatial.distance import pdist, squareform
from scipy.ndimage import generic_filter
from scipy.ndimage import convolve

import datashader as ds
import datashader.transfer_functions as tf
import matplotlib.pyplot as plt
import gc

# X-Ray Segmentation
In this notebook, I provide an example of experimental segmentation performed to identify soil vs non-soil pixels. The technique I use is indicator kriging based segmentation technique as developed by Oh and Lindquist (1999)

Oh, Wonho, and Brent Lindquist. "Image thresholding by indicator kriging." IEEE Transactions on Pattern Analysis and Machine Intelligence 21.7 (1999): 590-602.

The first two cells below are the functions needed to segment a soil scan. After that, the remaining cells (a) segment a sample soil scan, (b) visualize the "void" voxels in 2D, and (c) clear the memory - important if you want to re-run without a memory crash.

In [None]:
def calculate_cdf_fast(array, num_array, bins=10000):
    """
    Calculate the Cumulative Distribution Function (CDF) for a list of numbers in a numpy array.
    This version uses a histogram and interpolation for speed.

    Parameters:
    array: numpy array
    num_array: list or numpy array of numbers for which CDF is calculated
    bins: number of bins to use in the histogram

    Returns:
    cdf_values: list of floats
        The CDF values for the numbers
    """
    
    # Calculate the histogram
    counts, bin_edges = np.histogram(array, bins=bins, density=True)

    # Calculate the CDF
    cdf = np.cumsum(counts)
    cdf = cdf / cdf[-1]  # normalize to 1

    # Use interpolation to find the CDF values for num_array
    cdf_values = np.interp(num_array, bin_edges[1:], cdf)

    return cdf_values

# Load all PILs
def tiff_stack_array(stack_index):
    total_images, tiff_files_sorted, path_to_tiff_stack = lp.get_tiff_stack(stack_index)
    tiff_stack = []
    for tiff_index in range(total_images):
        tiff_stack.append(Image.open(os.path.join(path_to_tiff_stack, tiff_files_sorted[tiff_index])))
    return total_images, tiff_stack

# Convert a subset to np arrays
def get_tiff_stack_np_array(tiff_stack, begin=None, end=None):
    if begin is None and end is None:
        np_array = np.stack([np.array(img) for img in tiff_stack])
    else:
        np_array = np.stack([np.array(img) for img in tiff_stack[begin:end]])
    return np_array

# Get the values needed to normalize by container density, as well as the centers for masking core later
def get_norm_and_core_centers(tiff_stack):
    
    tiff_total = len(tiff_stack)
    
    # ignore the top and bottom edges of the tiff stack
    low_tiff = round(tiff_total*0.1)
    high_tiff = round(tiff_total*0.9)
    
    # arrays to collect core center for whole stack interpolation
    interp_inds = []
    xcs = []
    ycs = []
    
    # arrays to calculate median outside and container attenuations for normalization
    outer_noise_attenuation = [];
    container_attenuation = [];
    
    i = -1;
    subset_indices = np.linspace(low_tiff, high_tiff - 1, num=50, dtype=int)
    
    # Subsample the tiffs to get average of background noise and container density
    for tiff_index in subset_indices:
        i = i+1
        image = np.array(tiff_stack[tiff_index])
        mask_container, mask_outside, mask_core = lp.get_masks_pre(image)
        xc, yc, r = lp.estimate_circle_from_image_pre(image)
        interp_inds.append(tiff_index)
        xcs.append(xc)
        ycs.append(yc)
        
        outer_noise_attenuation.append(np.nanmean(image[mask_outside]))
        container_attenuation.append(np.nanmean(image[mask_container]))
        lp.print_progress(i, len(subset_indices))
        
    outer_median = np.median(outer_noise_attenuation)
    container_median = np.median(container_attenuation)
    fx = interp1d(interp_inds,xcs,fill_value="extrapolate")
    fy = interp1d(interp_inds,ycs,fill_value="extrapolate")
    
    xc_s = fx(list(range(0, tiff_total)))
    yc_s = fy(list(range(0, tiff_total)))
    
    return outer_median, container_median, xc_s, yc_s

# Normalize image by container density
def scale_and_norm(np_image, outer_median, container_median):
    return (np_image-outer_median)/(container_median-outer_median)*1.022
  
## Thresholding
# Thresholding - smoothed indicator function (eq 21 in Oh and Lindquist)
def get_threshold_masks_soft_global(image,T0,T1,sample_set=None):
    
    if sample_set is None:
        sample_set = image
        
    # Standard deviations of the thresholded populations
    sig_0 = np.std(sample_set[sample_set<T0])
    sig_1 = np.std(sample_set[sample_set>T1])
    
    s0_l = 0
    s1_r = 0
    
    s0_r = (sig_0*T1+sig_1*T0)/(sig_0+sig_1)
    s1_l = (sig_0*T1+sig_1*T0)/(sig_0+sig_1)
    
    # cdfs for soft indicators
    F_lims = calculate_cdf_fast(sample_set.flatten(), [T0-s0_l,T0+s0_r,T1-s1_l,T1+s1_r], bins=10000)
    F_low_softs = calculate_cdf_fast(sample_set.flatten(), image[(image>=(T0-s0_l))&(image<=(T0+s0_r))], bins=10000)
    F_high_softs = calculate_cdf_fast(sample_set.flatten(), image[(image>=(T1-s1_l))&(image<=(T1+s1_r))], bins=10000)
    
    # void/air (0) threshold indicator
    i_T0 = image*0;
    i_T0[image<(T0-s0_l)] = 1
    i_T0[image>(T0+s0_r)] = 0
    i_T0[(image>=(T0-s0_l))&(image<=(T0+s0_r))] = (F_lims[1]-F_low_softs)/(F_lims[1]-F_lims[0])
    
    # solid/soil (1) threshold indicator
    i_T1 = image*0;
    i_T1[image<(T1-s1_l)] = 1
    i_T1[image>(T1+s1_r)] = 0
    i_T1[(image>=(T1-s1_l))&(image<=(T1+s1_r))] = (F_lims[3]-F_high_softs)/(F_lims[3]-F_lims[2])
    
    return i_T0, i_T1, F_lims, s0_l, s1_r, s0_r, s1_l

def get_threshold_masks_soft(image,T0,T1,F_lims, s0_l, s1_r, s0_r, s1_l,sample_set=None):
    
    if sample_set is None:
        sample_set = image

    F_low_softs = calculate_cdf_fast(sample_set.flatten(), image[(image>=(T0-s0_l))&(image<=(T0+s0_r))], bins=100)
    F_high_softs = calculate_cdf_fast(sample_set.flatten(), image[(image>=(T1-s1_l))&(image<=(T1+s1_r))], bins=100)
    
    # void/air (0) threshold indicator
    i_T0 = image*0;
    i_T0[image<(T0-s0_l)] = 1
    i_T0[image>(T0+s0_r)] = 0
    i_T0[(image>=(T0-s0_l))&(image<=(T0+s0_r))] = (F_lims[1]-F_low_softs)/(F_lims[1]-F_lims[0])
    
    # solid/soil (1) threshold indicator
    i_T1 = image*0;
    i_T1[image<(T1-s1_l)] = 1
    i_T1[image>(T1+s1_r)] = 0
    i_T1[(image>=(T1-s1_l))&(image<=(T1+s1_r))] = (F_lims[3]-F_high_softs)/(F_lims[3]-F_lims[2])
    
    return i_T0, i_T1

# Kriging
    # Build Array
def get_neighbor_coordinates(r):
    """
    Generate coordinates within a radius r.

    Parameters:
    r: int
        Radius for the generation of coordinates.

    Returns:
    neighbor_coordinates: numpy array
        An array of tuples with the coordinates within the given radius.
    """
    neighbor_coordinates = np.array([(dx, dy) for dx in range(-r, r+1) for dy in range(-r, r+1) if dx**2 + dy**2 <= r**2])
    # Find the index of [0, 0]
    index = np.where((neighbor_coordinates == [0, 0]).all(axis=1))[0]
    # Delete [0, 0] from the current position and append it at the end
    neighbor_coordinates = np.delete(neighbor_coordinates, index, axis=0)
    neighbor_coordinates = np.append(neighbor_coordinates, [[0, 0]], axis=0)
    return neighbor_coordinates
  
def create_kernel(neighbor_coordinates, weights):
    """
    Create a kernel matrix based on the extents of the coordinates and assign weights.

    Parameters:
    neighbor_coordinates: numpy array
        An array of tuples with the coordinates.
    weights: numpy array
        An array of weights with the same length as neighbor_coordinates.

    Returns:
    kernel: numpy array
        A kernel matrix with weights assigned to corresponding neighbor_coordinates.
    """
    
    # Find the minimum and maximum coordinates in each dimension
    min_coords = np.min(neighbor_coordinates, axis=0)
    max_coords = np.max(neighbor_coordinates, axis=0)
    
    # The size of the kernel is the difference between the min and max coordinates, plus 1
    # We add 1 because the coordinates are 0-indexed
    kernel_size = max_coords - min_coords + 1
    
    # Create the empty kernel
    kernel = np.zeros(kernel_size, dtype=float)
    
    # Shift the coordinates so that the minimum coordinate is at (0, 0)
    shifted_coordinates = neighbor_coordinates - min_coords
    
    # Assign the weights to the corresponding cells in the kernel
    for coord, weight in zip(shifted_coordinates, weights):
        kernel[tuple(coord)] = weight
    
    return kernel

def calculate_average_empirical_semivariogram(xray_set, max_lag_distance=12):
    semis = [];
    for i in range(xray_set.shape[0]):
        semis.append(calculate_empirical_semivariogram(xray_set[i], max_lag_distance=max_lag_distance))
    empirical_semivariogram = np.mean(semis,axis=0)
    return empirical_semivariogram

def calculate_empirical_semivariogram(xray_data, max_lag_distance=12):
    # Flattening the 2D data to 1D
    data_1d = xray_data.flatten()

    # Getting the coordinates of the voxels
    y,x = np.indices(xray_data.shape)
    coordinates = np.vstack([x.flatten(), y.flatten()]).T

    # Calculating the distances and differences between every pair of voxels
    distances = pdist(coordinates, metric='euclidean')
    differences = pdist(data_1d[:, None], metric='euclidean')

    # Only considering pairs within the max lag distance
    mask = distances <= max_lag_distance

    distances = distances[mask]
    differences = differences[mask]

    # Binning the distances and differences into the lags
    bins = np.arange(0, max_lag_distance+1)
    bin_indices = np.digitize(distances, bins)

    # Calculating the empirical semivariogram
    empirical_semivariogram = np.array([np.mean(differences[bin_indices == i]**2) / 2.0 for i in range(1, len(bins))])
    empirical_semivariogram[0] = 0;
    return empirical_semivariogram

def fit_semivariogram_model(empirical_semivariogram, model, initial_guess):
    # Define a cost function for the residual error minimization problem
    def cost_function(params):
        return np.sum((empirical_semivariogram - model(np.arange(len(empirical_semivariogram)), *params))**2)

    # Use simulated annealing to find the optimal parameters
    result = opt.basinhopping(cost_function, initial_guess, niter=1000)

    return result.x

# Define a model for the semivariogram, wer are using gaussian
def varimodel(h, nugget, sill, range):
    # Gaussian model
    return nugget + sill * (1 - np.exp(-(h / range) ** 2))

def solve_kriging(CC):
    """
    Solve the Indicator Kriging system of equations.

    Parameters:
    CC: numpy array of shape (n+1, n+1)
        Covariance matrix where the last row and last column (except the very last element) 
        represent covariances between the observed locations and the estimation location, 
        and the rest of the matrix represents covariances between the observed locations.

    Returns:
    weights: numpy array of shape (n,)
        Weights assigned to each observed value.
    lagrange_multiplier: float
        Lagrange multiplier enforcing the unbiasedness condition.
    """
    
    # Extract the covariances between the observed locations
    C = CC[:-1, :-1]
    # Extract the covariances between the observed locations and the estimation location
    c = CC[:-1, -1]
    # Add a row and a column of ones to the matrix, and a 1 at the end of the vector
    # This is for the Lagrange multiplier that enforces the unbiasedness condition
    C = np.vstack((C, np.ones(C.shape[1])))
    C = np.hstack((C, np.ones((C.shape[0], 1))))
    c = np.append(c, 1)

    # Solve the kriging system
    #solution = np.linalg.solve(C, c)
    solution, residuals, rank, s = np.linalg.lstsq(C, c, rcond=None)

    # The weights are all but the last element of the solution
    weights = solution[:-1]
    
    # Negative weights in the solution of (7) have the potential of producing negative probabilities in (6).
    # If negative weights occur, we adjust the weights using the simple, robust, a posteriori scheme proposed in [29].   
    # Let $x_1, ..., p$ denote the subset of locations where the weights are negative, 
    indices = np.where(weights < 0)
     
    if len(indices)>0:
        # $\bar{w}$ denote the average magnitude of the negative weights, and $C$ denote the average covariance.
        negabsmean = abs(np.nanmean(weights[indices]))
        negcmean = np.nanmean(c[indices])
        # Positive weights smaller than $\bar{w}$ whose covariance
        # to the location $x_0$ is smaller than $C$ are also set to zero.
        weights[(weights>0)&(weights<negabsmean)&(weights<negcmean)]=0
        # The negative weights are set to zero.
        weights[indices]
    # The remaining positive weights are renormalized uniformly to sum to one.
    weights = weights/np.sum(weights)
        
    # The last element of the solution is the Lagrange multiplier
    lagrange_multiplier = solution[-1]

    return weights
def majority_filter(image, size):
    """
    Apply a majority filter to an thresholded image. This image should have one of three values:
    0: pixels in the original image less than a lower threshold
    1: pixels in the original image greater than an upper threshold
    2: unclassified pixels - these are ignored
    

    Parameters:
    image: numpy array
        The input image.
    size: int
        The size of the filter. Must be odd.

    Returns:
    numpy array
        The filtered image.
    """
    # Define the majority function
    def majority(x):
        center_pixel = x[len(x) // 2]
        output_pixel = center_pixel
        if center_pixel==0:
            if (np.sum(x==1)/len(x))>=0.6:
                output_pixel=5
        elif center_pixel==1:
            if (np.sum(x==0)/len(x))>=0.6:
                output_pixel=-5
        # Return the value that occurs most frequently
        return output_pixel

    # Apply the majority filter
    return generic_filter(image, majority, size=size)

In [None]:
def krig_image(tiff_stack, tiff_int, T0, T1, outer_median, container_median, xc_s, yc_s, T0_kernal, T1_kernal, F_lims, s0_l, s1_r, s0_r, s1_l, sample_set):
    # Process a sample image
    curim = np.array(tiff_stack[tiff_int])
    curim = scale_and_norm(curim, outer_median, container_median) ## ALWAYS REMEMBER TO SCALE

    # calculate the softened indicator values for the sample set
    i_T0, i_T1 = get_threshold_masks_soft(curim,T0,T1,F_lims, s0_l, s1_r, s0_r, s1_l, sample_set)
    curim_init = ((curim>=T0)&(curim<=T1))*2 + (curim<T0)*0 + (curim>T1)*1

    # Get your unknown population
    ui,uj = np.where((curim>=T0)&(curim<=T1))

    #P0 = convolve2d(i_T0, T0_kernal,mode='same')
    #P1 = convolve2d(i_T1, T1_kernal,mode='same')
    
    P0 = convolve(i_T0, T0_kernal)
    P1 = convolve(i_T1, T1_kernal)
    
    Kval = (1-P1)>=P0
    curim_kriged = curim_init.copy()
    curim_kriged[ui,uj] = Kval[ui,uj]
    
    mask_container, mask_outside, mask_core = lp.masks_by_region(curim_kriged, xc_s[tiff_int], yc_s[tiff_int],490)
    voids_y, voids_x = np.where((curim_kriged==0)&mask_core)
    voids_z = voids_y*0-tiff_int
    
    return voids_x,voids_y,voids_z

def krig_image_show(tiff_stack, tiff_int, T0, T1, outer_median, container_median, xc_s, yc_s, T0_kernal, T1_kernal, F_lims, s0_l, s1_r, s0_r, s1_l, sample_set):
    # Process a sample image
    curim = np.array(tiff_stack[tiff_int])
    #curim = scale_and_norm(denoise_tv(curim), outer_median, container_median) ## ALWAYS REMEMBER TO SCALE
    curim = scale_and_norm((curim), outer_median, container_median) ## ALWAYS REMEMBER TO SCALE

    # calculate the softened indicator values for the sample set
    i_T0, i_T1 = get_threshold_masks_soft(curim,T0,T1,F_lims, s0_l, s1_r, s0_r, s1_l, sample_set)
    curim_init = ((curim>=T0)&(curim<=T1))*2 + (curim<T0)*0 + (curim>T1)*1

    # Get your unknown population
    ui,uj = np.where((curim>=T0)&(curim<=T1))

    #P0 = convolve2d(i_T0, T0_kernal,mode='same')
    #P1 = convolve2d(i_T1, T1_kernal,mode='same')
    
    P0 = convolve(i_T0, T0_kernal)
    P1 = convolve(i_T1, T1_kernal)
    KP = np.abs((1-P1)-P0)<0.05
    Kval = (1-P1)>=P0
    curim_kriged = curim_init.copy()
    curim_kriged[ui,uj] = Kval[ui,uj]
    
    mask_container, mask_outside, mask_core = lp.masks_by_region(curim_kriged, xc_s[tiff_int], yc_s[tiff_int],490)
    voids_y, voids_x = np.where((curim_kriged==0)&mask_core)
    voids_z = voids_y*0-tiff_int
    
    fig, ax = plt.subplots(4,1,figsize=(15, 60))
    ax[0].imshow(curim,cmap="gray",vmin=0,vmax=2.5)
    draw_circle(xc_s[tiff_int], yc_s[tiff_int], 490, ax[0])
    ax[1].imshow(curim_init)
    draw_circle(xc_s[tiff_int], yc_s[tiff_int], 490, ax[1])
    ax[2].imshow(curim_kriged)
    draw_circle(xc_s[tiff_int], yc_s[tiff_int], 490, ax[2])
    ax[3].imshow(KP,cmap="gray")
    print(np.min(KP))
    print(np.max(KP))
    draw_circle(xc_s[tiff_int], yc_s[tiff_int], 490, ax[3])
    return voids_x,voids_y,voids_z

def get_voids(stack_int):
    # our kriging window is a radius of 3 around unlabelled pixel
    neighbor_coordinates = get_neighbor_coordinates(3)
    # Threshold selection
    T0 = 0.5
    T1 = 1.075
    print('load tiff stack')
    # Load a tiff stack
    if 'tiff_stack' in locals():
        del tiff_stack
    total_images, tiff_stack = tiff_stack_array(stack_int)
    # Get its median outer and container attenuations for normalization
    print('calculate normalization parameters and get interpolated core centers')
    outer_median, container_median, xc_s, yc_s = get_norm_and_core_centers(tiff_stack)
    # Sample voxel set for building semivariograms (a central subset from 20 sample images from index 0 to 200 or first 7.8 mm).
    sample_set = np.stack([np.array(img) for img in tiff_stack[0:200:20]])
    sample_set = sample_set[:,700:800,700:800]
    sample_set = scale_and_norm(sample_set, outer_median, container_median) ## ALWAYS REMEMBER TO SCALE
    print('calculating softened indicator values')
    # calculate the softened indicator values for the sample set
    i_T0, i_T1, F_lims, s0_l, s1_r, s0_r, s1_l = get_threshold_masks_soft_global(sample_set,T0,T1)
    print('calculating semivariograms')
    # calculate a model for all images in sample_set and average together.
    T0_semivariogram = calculate_average_empirical_semivariogram(i_T0,12)
    T0_params = fit_semivariogram_model(T0_semivariogram, varimodel, initial_guess=[0.0, 1, 10.0])

    # calculate a model for all images in sample_set and average together.
    T1_semivariogram = calculate_average_empirical_semivariogram(i_T1,12)
    T1_params = fit_semivariogram_model(T1_semivariogram, varimodel, initial_guess=[0.0, 1, 10.0])
    coordinates = neighbor_coordinates.copy()
    # Calculate the distances between all pairs of voxels
    distances = squareform(pdist(coordinates)) 

    print('calculating T0 and T1 gaussian semivariogram models')
    # Calculate the semivariogram for these distances
    T0_model = varimodel(distances,*T0_params)
    T1_model = varimodel(distances,*T1_params)
    print('calculating T0 and T1 covariances')
    # Calculate the covariance matrices
    T0_covariance = T0_params[1] - T0_model
    T1_covariance = T1_params[1] - T1_model
    print('calculating kriging weights')
    # Solve for weights
    T0_weights = solve_kriging(T0_covariance)
    T1_weights = solve_kriging(T1_covariance)

    T0_kernal = create_kernel(coordinates[:-1], T0_weights)
    T1_kernal = create_kernel(coordinates[:-1], T1_weights)
    
    voids_x = [];
    voids_y = [];
    voids_z = [];
    
    for tiff_int in range(total_images):
        void_x,void_y,void_z = krig_image(tiff_stack,tiff_int, T0, T1, outer_median, container_median, xc_s, yc_s, T0_kernal, T1_kernal,F_lims, s0_l, s1_r, s0_r, s1_l, sample_set)
        voids_x.append(void_x)
        voids_y.append(void_y)
        voids_z.append(void_z)
        lp.print_progress(tiff_int,total_images)
    del tiff_stack
    voids_x = np.concatenate(voids_x)
    voids_y = np.concatenate(voids_y)
    voids_z = np.concatenate(voids_z)
    return voids_x,voids_y,voids_z
    

In [None]:
voids_x,voids_y,voids_z = get_voids(2)

In [None]:
# Create a Pandas DataFrame from the NumPy arrays
df = pd.DataFrame({'x': voids_x, 'y': voids_y, 'z': voids_z})
# Create a canvas using Datashader
cvs = ds.Canvas(plot_width=(np.max(voids_x)-np.min(voids_x))//2, plot_height=np.abs(np.min(voids_z))//2, x_range=(np.min(voids_x), np.max(voids_x)),
                y_range=(np.min(voids_z), np.max(voids_z)))

# Aggregate the points into a 2D heatmap using the y coordinate as the quantity
agg = cvs.points(df, 'x', 'z', ds.count())

# Convert the aggregated heatmap to a NumPy array
heatmap = tf.shade(agg)

fig, ax = plt.subplots(figsize=(30, 30))
# Plot the heatmap
heatmap_np = np.asarray(np.log1p(agg))
#ax.imshow(heatmap_np, cmap='hot', origin='lower',  extent=[np.min(voids_x), np.max(voids_x),
                            #                            np.min(voids_z), np.max(voids_z)])
# Add colorbar, xlabel, ylabel, and title
cbar = plt.colorbar(ax.imshow(heatmap_np, cmap='hot', origin='lower',extent=[np.min(voids_x), np.max(voids_x),
                                                                               np.min(voids_z), np.max(voids_z)]),
                    ax=ax, label='Voxel Quantity')
ax.set_xlabel('X')
ax.set_ylabel('Z')
ax.set_title('Voxel Quantity Heatmap')

heatmap

In [None]:
## Clear up memory
del voids_x, voids_y, voids_z, df
gc.collect()