# Imports

In [3]:
from general_funcs import *
import cv2 as cv
from skimage.color import rgb2gray
from skimage.io import imread
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support

# Watershred

## Functions

In [4]:
def evaluate_watershed_segmentation(gtm_image, binary_mask):
    """
    Computes precision, recall, and F1 score for binary segmentation masks.

    Parameters:
        gtm_image (np.ndarray): Ground truth binary mask (2D array).
        binary_mask (np.ndarray): Predicted binary mask (2D array).

    Returns:
        dict: Dictionary containing precision, recall, and F1 score.
    """
    # Flatten the masks
    gtm_image_flat = gtm_image.flatten()

    binary_mask_flat = binary_mask.flatten()

    # Ensure binary values (0 and 1 only)
    gtm_image_flat = (gtm_image_flat > 0).astype(int)
    binary_mask_flat = (binary_mask_flat > 0).astype(int)

    # Calculate precision, recall, and F1 score
    precision, recall, f1, _ = precision_recall_fscore_support(gtm_image_flat, binary_mask_flat, average='binary')

    dice_score = calculate_dice_score(gtm_image, binary_mask)
    dice_score_class_1 = calculate_dice_score_for_1_class(gtm_image, binary_mask)
    jaccard_score = calculate_jaccard_index(gtm_image, binary_mask)
    accuracy = calculate_accuracy(gtm_image, binary_mask)
    weighted_accuracy = calculate_weighted_accuracy(gtm_image, binary_mask)
    return {"precision": precision,
            "recall": recall,
            "f1_score": f1,
            "dice_score": dice_score,
            "dice_score_class_1": dice_score_class_1,
            "jaccard_score": jaccard_score,
            "accuracy": accuracy,
            "weighted_accuracy": weighted_accuracy
            }

def long_watershed(img, imgRGB):
    
    # Otsu's thresholding
    _, imgThreshold = cv.threshold(img,0,255,cv.THRESH_BINARY+cv.THRESH_OTSU)


    # Remove noise (optional but recommended for better results)
    kernel = np.ones((3, 3), np.uint8)
    opening = cv.morphologyEx(imgThreshold, cv.MORPH_OPEN, kernel, iterations=2)

    # Identify sure background (via dilation)
    sure_bg = cv.dilate(opening, kernel, iterations=3)

    # Identify sure foreground (via distance transform and thresholding)
    dist_transform = cv.distanceTransform(opening, cv.DIST_L2, 5)
    _, sure_fg = cv.threshold(dist_transform, 0.5 * dist_transform.max(), 255, 0)

    # Identify unknown region (subtract foreground from background)
    sure_fg = np.uint8(sure_fg)
    unknown = cv.subtract(sure_bg, sure_fg)

    # Label connected components in the foreground
    _, markers = cv.connectedComponents(sure_fg)

    # Add 1 to all labels so that the background is not 0
    markers = markers + 1

    # Mark the unknown region with 0
    markers[unknown == 255] = 0

    # Apply the Watershed algorithm
    markers = cv.watershed(imgRGB, markers)

    # Mark the boundaries on the original image
    imgRGB[markers == -1] = [255, 0, 0]  # Red boundary

    # Paint the markers shape in white for creating the ground truth mask
    binary_mask = np.zeros_like(markers, dtype=np.uint8)
    binary_mask[markers > 1] = 255  # Adjust based on segmentation requirements

    return binary_mask

def short_watershed(img, imgRGB):
    # Otsu's thresholding
    _, imgThreshold = cv.threshold(img, 0, 255, cv.THRESH_BINARY + cv.THRESH_OTSU)

    # Identify sure foreground (via distance transform and thresholding)
    dist_transform = cv.distanceTransform(imgThreshold, cv.DIST_L2, 5)
    _, sure_fg = cv.threshold(dist_transform, 0.5 * dist_transform.max(), 255, 0)

    # Convert sure foreground to uint8
    sure_fg = np.uint8(sure_fg)

    # Identify the unknown region (subtract sure foreground from the original binary image)
    unknown = cv.subtract(imgThreshold, sure_fg)

    # Label connected components in the foreground
    _, markers = cv.connectedComponents(sure_fg)

    # Add 1 to all labels so that the background is not 0
    markers = markers + 1

    # Mark the unknown region with 0
    markers[unknown == 255] = 0

    # Apply the Watershed algorithm
    markers = cv.watershed(imgRGB, markers)

    # Mark the boundaries on the original image
    imgRGB[markers == -1] = [255, 0, 0]  # Red boundary

    # Paint the markers shape in white for creating the ground truth mask
    binary_mask = np.zeros_like(markers, dtype=np.uint8)
    binary_mask[markers > 1] = 255  # Adjust based on segmentation requirements

    # display_image(imgRGB, "imgRGB")

    return binary_mask


def print_scores(accuracies):
    avg_metrics = {f"avg_{metric}" : 0 for metric in accuracies[0].keys()}
    num_of_images = len(accuracies)
    # Calculate the total scores for each metric
    for evaluation_dict in accuracies:
        for metric, score in evaluation_dict.items():
                avg_metrics[f"avg_{metric}"] += score
    
    # Calculate the average score for each metric
    for metric, total_score in avg_metrics.items():
        avg_score = total_score/num_of_images
        # convert scores to precentages id needed
        avg_metrics[metric] = avg_score * 100 if metric != "avg_sse" else avg_score
    
    # Convert the dictionary to a pandas DataFrame for better formatting
    df = pd.DataFrame(list(avg_metrics.items()), columns=['Metric', 'Value'])

    # Convert the dictionary to a pandas DataFrame for better formatting
    df = pd.DataFrame(list(avg_metrics.items()), columns=['Metric', 'Value'])

    # Round the values to 2 decimal places for clarity
    df['Value'] = df['Value'].round(2)

    # Use style.set_table_styles to left-align the "Metric" column
    styled_df = df.style.set_table_styles(
    [{'selector': 'td:nth-child(2)', 'props': [('text-align', 'left')]},  # Left-align the first column
    {'selector': 'td', 'props': [('text-align', 'right')]},  # left-align all other columns
    {'selector': 'th', 'props': [('text-align', 'left')]}  # Center-align the headers
    ])

    # Apply the formatting to the "Value" column (display with 2 decimals)
    styled_df = styled_df.format({'Value': '{:.2f}'})

    # Display the styled DataFrame
    display(styled_df)


## Testing

In [5]:
gtmasks_dirs = list(filter(lambda dname: dname.endswith('gtmasks'),os.listdir('Img')))
gtmasks_dirs = sorted(gtmasks_dirs, key=lambda dname: int(dname.split('_')[3]))
gtmasks_dict = {dname: os.listdir(f'Img/{dname}') for dname in gtmasks_dirs}
gtmasks_dict.keys()

dict_keys(['wire_images_video_1_gtmasks', 'wire_images_video_2_gtmasks', 'wire_images_video_3_gtmasks', 'wire_images_video_4_gtmasks', 'wire_images_video_5_gtmasks', 'wire_images_video_6_gtmasks', 'wire_images_video_7_gtmasks', 'wire_images_video_8_gtmasks'])

In [6]:
image_dirs = list(filter(lambda dname: dname[-1].isdigit(), os.listdir('Img')))
image_dirs = sorted(image_dirs, key=lambda dname: int(dname[-1]))
images_dict = {dname: os.listdir(f'Img/{dname}') for dname in image_dirs}
images_dict

{'wire_images_video_1': ['wire1_ultrasound_watertank.png',
  'wire2_ultrasound_watertank.png',
  'wire3_ultrasound_watertank.png',
  'wire4_ultrasound_watertank.png',
  'wire5_ultrasound_watertank.png',
  'wire6_ultrasound_watertank.png'],
 'wire_images_video_2': ['wire10_ultrasound_watertank.png',
  'wire11_ultrasound_watertank.png',
  'wire12_ultrasound_watertank.png',
  'wire13_ultrasound_watertank.png',
  'wire14_ultrasound_watertank.png',
  'wire15_ultrasound_watertank.png',
  'wire16_ultrasound_watertank.png',
  'wire17_ultrasound_watertank.png',
  'wire18_ultrasound_watertank.png',
  'wire19_ultrasound_watertank.png',
  'wire20_ultrasound_watertank.png',
  'wire21_ultrasound_watertank.png',
  'wire22_ultrasound_watertank.png',
  'wire23_ultrasound_watertank.png',
  'wire7_ultrasound_watertank.png'],
 'wire_images_video_3': ['wire24_ultrasound_watertank.png',
  'wire25_ultrasound_watertank.png',
  'wire26_ultrasound_watertank.png',
  'wire27_ultrasound_watertank.png',
  'wire28_u

In [18]:
# Long Watershed

total_images = 0
accuracies = []
for image_dir, gtmask_dir in zip(images_dict, gtmasks_dict):
    for image_name, gtmask_name in zip(images_dict[image_dir], gtmasks_dict[gtmask_dir]):
        im_path = get_image_path(image_dir, image_name)
        gtm_path = get_image_path(gtmask_dir, gtmask_name)
        
        img = cv.imread(im_path)
        gtm_image = cv.imread(gtm_path, cv.IMREAD_GRAYSCALE)
        # Filter image top
        img[0:176, :, :] = 0
        gtm_image[0:176, :] = 0

        # Create RGB iamge
        imgRGB = cv.cvtColor(img, cv.COLOR_BGR2RGB)
        # Create gray scale image
        img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

        # imgRGB = filter_rgbimage_top(imgRBG)
        # img = filter_image_top(img)

        # for binary_inv_threshold in [10, 20, 30, 40, 50]:
        # for binary_threshold in [50]:
        binary_mask = long_watershed(img, imgRGB)
        
        evaluation_dict = evaluate_watershed_segmentation(gtm_image, binary_mask)
        accuracies.append(evaluation_dict)
        total_images += 1

        # display_image(gtm_image, "gtm")
        # display_image(binary_mask, "prediction")

print_scores(accuracies)
print(f"average over {total_images} images")

Unnamed: 0,Metric,Value
0,avg_precision,96.52
1,avg_recall,41.11
2,avg_f1_score,56.68
3,avg_dice_score,99.07
4,avg_dice_score_class_1,56.68
5,avg_jaccard_score,40.52
6,avg_accuracy,99.07
7,avg_weighted_accuracy,2.06


average over 89 images


In [None]:
# Short Watershed

total_images = 0
accuracies = []
for image_dir, gtmask_dir in zip(images_dict, gtmasks_dict):
    for image_name, gtmask_name in zip(images_dict[image_dir], gtmasks_dict[gtmask_dir]):
        im_path = get_image_path(image_dir, image_name)
        gtm_path = get_image_path(gtmask_dir, gtmask_name)
        
        img = cv.imread(im_path)
        gtm_image = cv.imread(gtm_path, cv.IMREAD_GRAYSCALE)
        # Filter image top
        img[0:176, :, :] = 0
        gtm_image[0:176, :] = 0

        # Create RGB iamge
        imgRGB = cv.cvtColor(img, cv.COLOR_BGR2RGB)
        # Create gray scale image
        img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

        # imgRGB = filter_rgbimage_top(imgRBG)
        # img = filter_image_top(img)

        # for binary_inv_threshold in [10, 20, 30, 40, 50]:
        # for binary_threshold in [50]:
        binary_mask = short_watershed(img, imgRGB)
        
        evaluation_dict = evaluate_watershed_segmentation(gtm_image, binary_mask)
        accuracies.append(evaluation_dict)
        total_images += 1
        # display_image(gtm_image, "gtm")
        # display_image(binary_mask, "prediction")

print_scores(accuracies)
print(f"average over {total_images} images")

Unnamed: 0,Metric,Value
0,avg_precision,96.08
1,avg_recall,36.94
2,avg_f1_score,52.67
3,avg_dice_score,99.02
4,avg_dice_score_class_1,52.67
5,avg_jaccard_score,36.43
6,avg_accuracy,99.02
7,avg_weighted_accuracy,2.01


average over 0 images


In [8]:
# Yen (binary thresholding method) Watershed

from skimage import filters

def apply_thresholding_method(image):
    """
    Apply the corresponding thresholding method to the image.
    This function supports various thresholding techniques based on the method name.
    """
    threshold = filters.threshold_yen(image)
    
    # Generate binary mask based on the threshold
    binary_mask = image > threshold
    return binary_mask

def yen_watershed(img, imgRGB):
    # Apply thresholding method
    imgThreshold = apply_thresholding_method(img)
    
    # Convert the binary mask to uint8 (0 or 255)
    imgThreshold = (imgThreshold * 255).astype(np.uint8)

    # Identify sure foreground (via distance transform and thresholding)
    dist_transform = cv.distanceTransform(imgThreshold, cv.DIST_L2, 5)
    _, sure_fg = cv.threshold(dist_transform, 0.5 * dist_transform.max(), 255, 0)

    # Convert sure foreground to uint8
    sure_fg = np.uint8(sure_fg)

    # Identify the unknown region (subtract sure foreground from the original binary image)
    unknown = cv.subtract(imgThreshold, sure_fg)

    # Label connected components in the foreground
    _, markers = cv.connectedComponents(sure_fg)

    # Add 1 to all labels so that the background is not 0
    markers = markers + 1

    # Mark the unknown region with 0
    markers[unknown == 255] = 0

    # Apply the Watershed algorithm
    markers = cv.watershed(imgRGB, markers)

    # Mark the boundaries on the original image
    imgRGB[markers == -1] = [255, 0, 0]  # Red boundary

    # Paint the markers shape in white for creating the ground truth mask
    binary_mask = np.zeros_like(markers, dtype=np.uint8)
    binary_mask[markers > 1] = 255  # Adjust based on segmentation requirements

    return binary_mask



total_images = 0
accuracies = []
for image_dir, gtmask_dir in zip(images_dict, gtmasks_dict):
    for image_name, gtmask_name in zip(images_dict[image_dir], gtmasks_dict[gtmask_dir]):
        im_path = get_image_path(image_dir, image_name)
        gtm_path = get_image_path(gtmask_dir, gtmask_name)
        
        img = cv.imread(im_path)
        gtm_image = cv.imread(gtm_path, cv.IMREAD_GRAYSCALE)
        # Filter image top
        img[0:176, :, :] = 0
        gtm_image[0:176, :] = 0

        # Create RGB iamge
        imgRGB = cv.cvtColor(img, cv.COLOR_BGR2RGB)
        # Create gray scale image
        img = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

        # imgRGB = filter_rgbimage_top(imgRBG)
        # img = filter_image_top(img)

        # for binary_inv_threshold in [10, 20, 30, 40, 50]:
        # for binary_threshold in [50]:
        binary_mask = yen_watershed(img, imgRGB)
        
        evaluation_dict = evaluate_watershed_segmentation(gtm_image, binary_mask)
        accuracies.append(evaluation_dict)
        total_images += 1
        # display_image(gtm_image, "gtm")
        # display_image(binary_mask, "prediction")

print_scores(accuracies)
print(f"average over {total_images} images")

Unnamed: 0,Metric,Value
0,avg_precision,94.28
1,avg_recall,50.95
2,avg_f1_score,64.85
3,avg_dice_score,99.27
4,avg_dice_score_class_1,64.85
5,avg_jaccard_score,50.24
6,avg_accuracy,99.27
7,avg_weighted_accuracy,2.25


average over 89 images
