In [None]:
import os
import statistics
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from skimage.color import rgb2gray
from itertools import product
from skimage import measure
from math import sqrt
from sklearn.cluster import KMeans
from scipy import ndimage

In [None]:
def dpm(
    ground_truth: str, prediction: str, 
    size_threshold: int, scale: float, 
    weight_d: float, weight_s: float, weight_a: float):
    '''
    Returns the discrete protein metric (DPM) similarity score of two focal adhesion (FA) images.
    
            Parameters:
                    ground_truth: A string of the path to image containing membrane and ground truth FA
                    prediction: A string of the path to image containing membrane and predicted FA to be compared to ground truth
                    size_threshold: An integer value for size threshold; any FAs with an area less than this value are dropped
                    scale: A float value for image scale (microns/pixel)
                    weight_d: A float value for importance weight to be applied for distribution measurement
                    weight_s: A float value for importance weight to be applied for shape/size measurement
                    weight_a: A float value for importance weight to be applied for angle measurement
            
            Returns:
                    dpm: A numpy float value for the discrete protein metric score
                    d: A numpy float value for the distribution measurement
                    s: A numpy float value for the shape/size measurement
                    a: A numpy float value for the angle measurement
    '''
    
    def euclidean_distance(row1: slice, row2: slice):
        '''
        Returns the euclidean distance between two coordinate points.
        
            Parameters:
                    row1: A slice of a list containing a single coordinate point
                    row2: A slice of a list containing another coordinate point
            
            Returns:
                    sqrt(distance): A float value for the euclidean distance
        '''
        distance = 0.0
        for i in range(len(row1)-1):
            distance += (row1[i] - row2[i])**2
        return sqrt(distance)

    
    def get_neighbors(train: list, test_row: slice, num_neighbors: int):
        '''
        Returns the n nearest neighbors for one coordinate point.
        
            Parameters:
                    train: A list containing a set of coordinate points
                    test_row: A slice of a list containing a single coordinate point
                    num_neighbors: An integer value for the n nearest neighbors to be retreived
            
            Returns:
                    neighbors: A list of coordinate points for the n nearest neighbors
        '''
        distances = list()
        for train_row in train:
            dist = euclidean_distance(test_row, train_row)
            distances.append((train_row, dist))
        distances.sort(key=lambda tup: tup[1])
        neighbors = list()
        for i in range(num_neighbors):
            neighbors.append(distances[i][0])
        return neighbors
    
    
    def fa_processing(img_path: str, size_threshold: int, scale: float):
        '''
        Returns numpy arrays for the binary and segmented FA images from the raw FA images.
        
            Parameters:
                    img_path: A string of the path to the image containing raw FA
                    size_threshold: An integer value for size threshold; any FAs with an area less than this value are dropped
                    scale: A float value for image scale (microns/pixel)
            
            Returns:
                    binary_img: A numpy array of the binary FA image
                    seg_img: A numpy array of the segmented FA image
        '''
        # Read image and extract FA channel into a new empty image
        img = cv2.imread(img_path)
        res = np.zeros((256, 256, 3))
        res[:,:,1] = img[:,:,1]
        # Apply a top hat filter with 3x3 rectangular kernel and place filtered image into a new empty image
        filterSize =(3, 3)
        kernel = cv2.getStructuringElement(cv2.MORPH_RECT, filterSize)
        tophat_img = cv2.morphologyEx(res[:,:,1], cv2.MORPH_TOPHAT, kernel)        
        processed_img = res
        processed_img[:,:,1] = tophat_img
        # Binarize filtered FA image
        ret, bw_img = cv2.threshold(processed_img,10,255,cv2.THRESH_BINARY)
        gray = rgb2gray(bw_img)
        gray = gray * 255
        gray = gray.astype(np.uint8)
        ret, out_l = cv2.threshold(gray,10,255,cv2.THRESH_BINARY)
        # Find the contours of each FA
        contours, hier = cv2.findContours(out_l, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        binary_img = np.zeros((256,256))
        # Iterate through each contour
        for cntr in contours:
            # Use bouding rectangle to find width and height of each FA to calculate area
            x,y,w,h = cv2.boundingRect(cntr)
            w = w*scale
            h = h*scale
            area = w*h
            # Write FAs that have an area equal or greater than the threshold into a new empty image
            if area >= size_threshold:
                cv2.drawContours(binary_img, [cntr], 0, (255, 0, 0), -1)
        # Apply laplacian operator to binary FA image to obtain segmented FA image
        kernel_laplace = np.array([np.array([1, 1, 1]), np.array([1, -8, 1]), np.array([1, 1, 1])])
        seg_img = ndimage.convolve(binary_img, kernel_laplace, mode='constant')
        seg_img = seg_img * 255
        ret, seg_img = cv2.threshold(seg_img,10,255,cv2.THRESH_BINARY)
        return binary_img, seg_img
    
    
    def membrane_processing(img_path: str):
        '''
        Returns numpy array for the segmented membrane image from the raw membrane image.
        
            Parameters:
                    img_path: A string of the path to the image containing raw membrane
            
            Returns:
                    seg_img: A numpy array of the segmented membrane image
        '''
        # Read image and extract membrane channel to binarize
        membrane = cv2.imread(img_path)
        ret, bw_img = cv2.threshold(membrane[:,:,2],10,255,cv2.THRESH_BINARY)
        # Place binary membrane channel into a new empty image
        res = np.zeros((256, 256, 3))
        res[:,:,2] = bw_img
        gray = rgb2gray(res)
        # Apply laplacian operator to binary membrane image to obtain segmented membrane image
        kernel_laplace = np.array([np.array([1, 1, 1]), np.array([1, -8, 1]), np.array([1, 1, 1])])
        out_l = ndimage.convolve(gray, kernel_laplace, mode='constant')
        out_l = out_l * 255
        ret, seg_img = cv2.threshold(out_l,10,255,cv2.THRESH_BINARY)
        return seg_img
    
    
    # Calculation of distribution (d) measurement
    
    # Apply membrane processing function to raw ground truth image
    membrane = membrane_processing(ground_truth)
    # Obtain x-y coordinates for membrane outline from segmented membrane image
    pixels = membrane.reshape(-1)
    xs = range(membrane.shape[0])
    ys = range(membrane.shape[1])
    indices = np.array(list(product(xs, ys)))
    index = pd.Series(pixels, name="pixels")
    df = pd.DataFrame({
        "xmem" : indices[:, 0],
        "ymem" : indices[:, 1]
    }, index=index)
    df = df[df.index != 0]
    df = df.reset_index()
    df = df.drop(columns=['pixels'])
    x_values = df['xmem'].values
    x_arr = np.reshape(x_values, (-1, 1))
    y_values = df['ymem'].values
    y_arr = np.reshape(y_values, (-1, 1))
    # Obtain minimum and maximum x and y values
    min_x = min(x_values)
    max_x = max(x_values)
    min_y = min(y_values)
    max_y = max(y_values)
    coordinates = np.hstack((x_arr, y_arr))
    # Extract bounds for membrane outline into a list
    coord_bound = []
    for x in range(min_x, max_x):
        new_coord = []
        for i in range(len(coordinates)):
            if coordinates[i][0] == x:
                new_coord.append(coordinates[i][1])
        y_min = min(new_coord)
        y_max = max(new_coord)
        coord_bound.append([x, y_min, y_max])

    # Apply fa processing function to raw ground truth image using size threshold and scale defined in dpm function
    # and use segmented FA image returned
    fa_truth = fa_processing(ground_truth, size_threshold, scale)[1]
    # Extract x-y coordinates for centroid of each ground truth FA site into a list
    ret,thresh = cv2.threshold(fa_truth,20,255,cv2.THRESH_BINARY_INV)
    labels= measure.label(thresh, background=0)
    bg_label = labels[0,0] 
    labels[labels==bg_label] = 0
    props = measure.regionprops(labels)
    centroids = np.zeros(shape=(len(np.unique(labels)) - 1,2))
    for i,prop in enumerate(props):
        my_centroid = prop.centroid
        centr = [0, 0]
        centr[0] = my_centroid[0]
        centr[1] = my_centroid[1]
        centroids[i,:]= centr
    
    # Train a k-means clustering algorithm with 5 clusters to ground truth centroid coordinate data
    kmeans = KMeans(n_clusters = 5, init = 'k-means++', random_state = 42).fit(centroids)

    # Apply fa processing function to raw prediction image using size threshold and scale defined in dpm function
    # and use segmented FA image returned
    fa_pred = fa_processing(prediction, size_threshold, scale)[1]
    # Extract x-y coordinates for centroid of each predicted FA site into a list
    ret,thresh = cv2.threshold(fa_pred,20,255,cv2.THRESH_BINARY_INV)
    labels= measure.label(thresh, background=0)
    bg_label = labels[0,0] 
    labels[labels==bg_label] = 0
    props = measure.regionprops(labels)
    centroids2 = np.zeros(shape=(len(np.unique(labels)) - 1,2))
    for i,prop in enumerate(props):
        my_centroid = prop.centroid
        centr = [0, 0]
        centr[0] = my_centroid[0]
        centr[1] = my_centroid[1]
        centroids2[i,:]= centr
    # Iterate through all predicted FA centroids and only keep centroids that are inside of membrane outline in a new list
    centroids_pred = []
    for i in range(len(centroids2)):
        for j in range(len(coord_bound)):
            if int(centroids2[i][0]) == coord_bound[j][0] and coord_bound[j][1] <= int(centroids2[i][1]) <= coord_bound[j][2]:
                centroids_pred.append([centroids2[i][0], centroids2[i][1]])
    # Calculate number of dropped FAs and use as penalizing factor
    dropped_number = len(centroids2) - len(centroids_pred)
    if len(centroids2) > 0:
        factor = 1 - (dropped_number/len(centroids2))
        # Use trained k-means clustering algorithm to predict to which cluster each predicted FA belongs to
        pred_clusters = kmeans.predict(centroids_pred)
        # Count number of predicted FAs on each cluster
        unique_pred, counts_pred = np.unique(pred_clusters, return_counts=True)
        # Count number of ground truth FAs on each cluster
        truth_clusters = kmeans.labels_
        unique_truth, counts_truth = np.unique(truth_clusters, return_counts=True)
        # If a cluster has zero FAs, add zero to the counts list
        clusters = []
        if len(counts_truth) != len(counts_pred):
            for j in range(len(counts_truth)-len(counts_pred)):
                counts_pred = np.append(counts_pred, 0)
        # Calculate the ratio of predicted FAs to ground truth FAs on each cluster
        for i in range(len(counts_truth)):
            cluster_ind = (min(counts_truth[i], counts_pred[i]))/(max(counts_truth[i], counts_pred[i]))
            clusters.append(cluster_ind)
        # Calculate d measurement by taking average of ratios for each cluster and multiply by penalizing factor
        d = factor*(statistics.mean(clusters))
    elif len(centroids2) == 0 and len(centroids) == 0:
        d = 1
    else:
        d = 0
    
    # Calculation of shape/size (s) and angle (a) measurement
    
    # Apply fa processing function to raw prediction image using size threshold and scale defined in dpm function
    # and use binary FA image returned
    segmented_fa = fa_processing(prediction, size_threshold, scale)[0]
    segmented_fa_gray = rgb2gray(segmented_fa)
    gray = segmented_fa_gray*255
    gray = gray.astype(np.uint8)
    # Find contours for each predicted FA site
    ret, out_l = cv2.threshold(gray,0,255,cv2.THRESH_BINARY)
    contours, hier = cv2.findContours(out_l, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    pred_img = np.zeros((256,256))
    rect_coord = []
    rect_dim = []
    rect_angle = []
    # Iterate through all predicted FA sites
    for cntr in contours:
        # Use bouding rectangle to obtain dimensions and coordinates of each FA site
        x,y,w,h = cv2.boundingRect(cntr)
        rect_coord.append([x,y])
        rect_dim.append([w,h])
        # Draw rectangles in a new image
        cv2.rectangle(pred_img, (x, y), (x+w, y+h), (255, 0, 0), -1)
        # Use minimum area rectangle to obtain angle of orientation of each FA site
        center, dim, angle = cv2.minAreaRect(cntr)
        rect_angle.append(angle)
    
    # Repeat same procedure as above to obtain dimensions, coordinates and angle of each ground truth FA site
    segmented_fa_truth = fa_processing(ground_truth, size_threshold, scale)[0]
    segmented_fa_truth_gray = rgb2gray(segmented_fa_truth)
    gray = segmented_fa_truth_gray*255
    gray = gray.astype(np.uint8)
    ret, out_l = cv2.threshold(gray,0,255,cv2.THRESH_BINARY)
    contours, hier = cv2.findContours(out_l, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    new_img = np.zeros((256,256))
    rect_coord_truth = []
    rect_dim_truth = []
    rect_angle_truth = []
    for cntr in contours:
        x,y,w,h = cv2.boundingRect(cntr)
        rect_coord_truth.append([x,y])
        rect_dim_truth.append([w,h])
        cv2.rectangle(new_img, (x, y), (x+w, y+h), (255, 0, 0), -1)
        center, dim, angle = cv2.minAreaRect(cntr)
        rect_angle_truth.append(angle)
    
    # Create new image to match location of FA sites in ground truth and prediction
    ground_truth_centered_img = np.zeros((256,256))
    angle_scores = []
    # Iterate through all predicted FA sites
    for i in range(len(rect_coord)):
        # Obtain centroid coordinates for FA site
        x_center = rect_coord[i][0] + rect_dim[i][0]//2
        y_center = rect_coord[i][1] + rect_dim[i][1]//2
        # Obtain nearest neighbor in the ground truth FA sites to the predicted FA site
        neighbors = get_neighbors(rect_coord_truth, rect_coord[i], 1)
        for neighbor in neighbors:
            # Extract nearest neighbor position in the ground truth FA sites list
            for j in range(len(rect_coord_truth)):
                if neighbor[0] == rect_coord_truth[j][0] and neighbor[1] == rect_coord_truth[j][1]:
                    # Obtain dimensions and centroid coordinates for nearest neighbor ground truth FA site
                    w = rect_dim_truth[j][0]
                    h = rect_dim_truth[j][1]
                    x = x_center - w//2
                    y = y_center - h//2
                    # Write new image with ground truth FA sites located at same position as predicted FA sites
                    cv2.rectangle(ground_truth_centered_img, (x, y), (x+w, y+h), (255, 0, 0), -1)
                    # Obtain angle score by normalizing angle values to a range of 0-90 degrees
                    if rect_angle[i] > 90:
                        angle_pred = 180 - rect_angle[i]
                    elif -90 <= rect_angle[i] < 0:
                        angle_pred = abs(rect_angle[i])
                    elif rect_angle[i] < -90:
                        angle_pred = 180 - abs(rect_angle[i])
                    else:
                        angle_pred = rect_angle[i]
                    if rect_angle_truth[j] > 90:
                        angle_truth = 180 - rect_angle_truth[j]
                    elif -90 <= rect_angle_truth[j] < 0:
                        angle_truth = abs(rect_angle_truth[j])
                    elif rect_angle_truth[j] < -90:
                        angle_truth = 180 - abs(rect_angle_truth[j])
                    else:
                        angle_truth = rect_angle_truth[j]
                    # Calculate deviation between predicted and ground truth FA site angle
                    angle_dev = abs(angle_pred - angle_truth)
                    angle_ind = 1 - (angle_dev/90)
                    angle_scores.append(angle_ind)
    
    # Compare ground truth centered image to predicted image
    y_true = ground_truth_centered_img.flatten()
    y_true = y_true/255
    y_pred = pred_img.flatten()
    y_pred = y_pred/255
    # Compute true positives, false negatives, and false positives
    tp = 0
    fn = 0
    fp = 0
    for k in range(len(y_true)):
        if y_true[k]==1 and y_pred[k]==1:
            tp += 1
        elif y_true[k]==1 and y_pred[k]==0:
            fn += 1
        elif y_true[k]==0 and y_pred[k]==1:
            fp += 1
    # Calculate precision and recall
    precision = tp/(tp+fp)
    recall = tp/(tp+fn)
    # Calculate s measurement using F1 score
    s = (2*precision*recall)/(precision+recall)
    # Calculate a measurement by taking averge of angle scores across all FA sites
    a = statistics.mean(angle_scores)

    # Calculate dpm score using defined weights and obtained d, s, and a measurements
    dpm = (weight_d*d)+(weight_s*s)+(weight_a*a)
    
    return dpm, d, s, a

In [None]:
dpm(ground_truth='unprocessed_ground_truth.png', 
    prediction='unprocessed_prediction.png', 
    size_threshold=1, 
    scale=0.7, 
    weight_d=0.33, 
    weight_s=0.33, 
    weight_a=0.33)