# Finálna implementácia zhlukovania a iterovanej dilatácie

**Autor: Bc. Ivan Vykopal**

Finálny notebook obsahujúci zhlukovanie na základe hustoty, iterovanú dilatáciu a aj výslednú konverziu snímok do formy GeoJSON súborov.

In [None]:
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
import glob
import numpy as np
from sklearn.cluster import DBSCAN
import geojson
from shapely.geometry import shape
import os
from geojson import Polygon, FeatureCollection, Feature, dump

In [None]:
img_directory = 'D:/Master Thesis/ANN Imunne cells/'
geojson_directory = 'D:/Master Thesis/Data/EMB-IKEM-2022-03-09/QuPath project EMB - anotations/annotations/'
out_dir = 'D:/Master Thesis/Code/Clustering/result2/' #cesta k uloženým maskám
geojson_suf = '.vsi - 20x.geojson'

out_dir_dilatation ='D:/Master Thesis/Code/Iterative dilation/result/'

In [None]:
def display_sample(display_list):
    plt.figure(figsize=(18, 18))

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.imshow(display_list[i])
        plt.axis('off')
    plt.show()

In [None]:
def dilate(mask, original_mask):
    dilated = mask
    nuclei, hierarchy = cv.findContours(dilated, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    num_dilatations = 0

    while len(nuclei) != 1:
        dilated = cv.dilate(dilated, cv.getStructuringElement(cv.MORPH_ELLIPSE, (10, 10)))
        nuclei, hierarchy = cv.findContours(dilated, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
        num_dilatations += 1

    contours, hierarchy = cv.findContours(dilated, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    cv.drawContours(original_mask, contours, -1, (255, 255, 255), 3)
    return original_mask

In [None]:
def get_nucleus_info(feature):
    attributes = ["Nucleus: Hematoxylin OD mean", "Nucleus: Hematoxylin OD sum", "Nucleus: Hematoxylin OD std dev", "Nucleus: Hematoxylin OD max", "Nucleus: Hematoxylin OD min", "Nucleus: Eosin OD mean", "Nucleus: Eosin OD sum", "Nucleus: Eosin OD std dev", "Nucleus: Eosin OD max", "Nucleus: Eosin OD min", "Nucleus: Eosin OD range"]
    info = list()
    
    try:
        for measurement in feature['properties']['measurements']:
            if measurement['name'] in attributes:
                info.append(measurement['value'])
    except:
        print(feature)
        
    return info

In [None]:
def get_area(contours):
    area = 0
    for contour in contours:
        area += cv.contourArea(contour)
        
    return area

In [None]:
def iterative_dilation(image, threshold):
    gray = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    nuclei, hierarchy = cv.findContours(gray, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    
    area = get_area(nuclei)
    nuclei_count_all = len(nuclei)
    nuclei_count = nuclei_count_all
    nuclei_count_diff = nuclei_count_all
    
    dilated = gray
    while nuclei_count_diff > nuclei_count_all * threshold:
        dilated = cv.dilate(dilated, cv.getStructuringElement(cv.MORPH_ELLIPSE, (7, 7)))
        contours, hierarchy = cv.findContours(dilated, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
        nuclei_count_diff = nuclei_count - len(contours)
        nuclei_count = len(contours)
    
    return dilated, area, nuclei_count_all

In [None]:
def pretize_text(annotation_type):
    if annotation_type == 'blood_vessels':
        return 'Blood vessels'
    elif annotation_type == 'fatty_tissues':
        return 'Fatty tissue'
    elif annotation_type == 'inflammations':
        return 'Inflammation'
    elif annotation_type == 'endocariums':
        return 'Endocarium'
    elif annotation_type == 'fibrotic_tissues':
        return 'Fibrotic tissue'
    elif annotation_type == 'quilities':
        return 'Quilty'
    elif annotation_type == 'immune_cells':
        return 'Immune cells'
    else:
        annotation_type = annotation_type.replace('_', ' ')
        return annotation_type.replace(annotation_type[0], annotation_type[0].upper(), 1)

In [None]:
def get_coors(contour):
    coors = []
    for idx in range(len(contour)):
        coors.append(contour[idx, 0].tolist())

    return coors

In [None]:
def fix_polygon(contour):
    return np.concatenate((contour, [contour[0]]))

In [None]:
def create_properties_template(annotation):
    return {
        "object_type": "annotation",
        "classification": {
            "name": pretize_text(annotation)
        },
    }

In [None]:
def get_features(contours, annotation):
    features = []
    for contour in contours:
        contour = fix_polygon(contour)
        coors = get_coors(contour)

        features.append(Feature(
            geometry=Polygon([coors]),
            properties=create_properties_template(annotation)
        ))

    return features

In [None]:
def create_geojson(mask, annotation_classes=None):
    if annotation_classes is None:
        annotation_classes = [
            'blood_vessels',
            'endocariums',
            'fatty_tissues',
            'fibrotic_tissues',
            'immune_cells',
            'inflammations',
            'quilties'
        ]

    mask = np.uint8(mask)

    features = []
    if len(mask.shape) == 3:
        _, _, classes = mask.shape
        assert classes == len(annotation_classes)

        for c in range(classes):
            contours, hierarchy = cv.findContours(mask[:, :, c], cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)

            features.extend(*get_features(contours, annotation_classes[c]))

        return FeatureCollection(features)
    else:
        assert len(annotation_classes) == 1

        contours, hierarchy = cv.findContours(mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)
        features = get_features(contours, annotation_classes[0])

        return FeatureCollection(features)

## Identifikácia zápalov s využitím zhlukovania založeného na hustote

In [None]:
def get_inflammatory_clustering(image_dir, image_name, geojson_path, output_dir, eps=100, min_samples=20, save=False):
    image = cv.imread(f"{image_dir}{image_name}.png")
    image = np.zeros((image.shape[0], image.shape[1]))
    
    with open(geojson_path) as f:
        gj = geojson.load(f)
        
    features = gj['features'][1:]
    centroids = list()
    polygons = dict()
    
    index = 0
    for feature in features:
        if feature['properties']['classification']['name'] != 'Region*':
            s = shape(feature['geometry'])
            polygons[index] = s
            centroids.append([s.centroid.x, s.centroid.y])
            index += 1
    
    X = np.array(centroids)
    db = DBSCAN(eps=eps, min_samples=min_samples).fit(X)
    unique = np.unique(db.labels_)
            
    for unique_idx, unique_value in enumerate(unique[1:]):
        indexes = np.where(db.labels_ == unique_value)[0]
        mask = np.zeros((image.shape[0], image.shape[1]), dtype=np.uint8)

        for idx in indexes:
            coors = list(zip(*polygons[idx].exterior.coords.xy))
            pts = [[round(c[0]), round(c[1])] for c in coors]
            cv.fillPoly(mask, [np.array(pts)], 1)

        image = dilate(mask, image)
        
    if save:
        cv.imwrite(f"{output_dir}{file_name}.png", image)
        
    return image

In [None]:
files = glob.glob(f"{geojson_directory}*")

for file in files:
    file_name = file.replace('\\', '/').replace(geojson_directory, '').replace(geojson_suf, '')
    print(file_name)
    if os.path.exists(f"{img_directory}{file_name}.png"):
        image = get_inflammatory_clustering(img_directory, file_name, file, out_dir, save=True)

## Identifikácia založená na iteratívnej dilatácii

In [None]:
def get_inflammatory_dilation(image_dir, image_name, output_dir, threshold=0.01, nuclei_threshold=15, save=False):
    image = cv.imread(f"{image_dir}{image_name}.png")
    dilated, area, nuclei_count = iterative_dilation(image, 0.025)
    
    contours, hierarchy = cv.findContours(dilated, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    
    contour_threshold = list()
    avg_nucleus_area = area / nuclei_count

    for contour in contours:
        cnt_area = cv.contourArea(contour)

        if cnt_area > area * threshold:
            (x, y, w, h) = cv.boundingRect(contour)
            nuclei, hierarchy = cv.findContours(image[y:y + h,x:x + w, 0], cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)

            if len(nuclei) > nuclei_threshold: #15000
                contour_threshold.append(contour)
    
    new_image = cv.drawContours(image, contour_threshold, -1, (255, 255, 255), 3)
    
    if save:
        cv.imwrite(f"{output_dir}{file_name}.png", new_image)
    
    return new_image

In [None]:
files = glob.glob(f"{img_directory}*")

for file in files:
    file_name = file.replace('\\', '/').replace(directory, '').replace('.png', '')
    print(file_name)
    get_inflammatory_dilation(directory, file_name, out_dir_dilatation, save=True)

## Konverzia na geojson

In [None]:
files = glob.glob(f"{out_dir}*.png")

for file in files:
    file_name = file.replace('\\', '/').replace(out_dir, '').replace('.png', '')
    print(file_name)
    image = cv.imread(f"{file}")
    
    geojson_file = create_geojson(image[:,:,0], ['inflammations'])
    with open(f'{out_dir}{file_name}.geojson', 'w') as f:
        dump(geojson_file, f)