In [1]:
import io
import os
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
import torch
from ultralytics import YOLO
from PIL import Image, ImageDraw
import numpy as np
import pandas as pd
from IPython.display import clear_output
import socket
import time
from datetime import datetime
import shutil
from random import randint
import random
import re
import colorsys
import math
from scipy.spatial import Voronoi
import matplotlib.colors as mcolors
import matplotlib.path as mpltPath

In [14]:
# Functions

def sanitize_for_filesystem(input_string, len = 6):
    safe_string = re.sub(r'[^a-zA-Z0-9]', '', input_string)
    return safe_string[:len]

def yoloName_fromPath(path):
    parts = path.split(os.sep)
    if 'weights' in parts:
        index = parts.index('weights')
        if index > 0: return parts[index-1]
    return "UNK"

def save_tile_results(res2, save_path, max_tile_cols = 6, divider_width = 4, drawMode = 4, show=False): #DrawMode 0=Boxes Only, 1=MaskClass, 2=MaskInstance, 3=BoxInstance, 4=Box/Mask by Class
    divider_color = (255, 0, 0)  # Red divider color
    drawBoxes_byClass = drawMode==0 or drawMode==4
    drawMasks_byClass = drawMode==1 or drawMode==4 or drawMode==2
    drawMasks_byInstance = drawMode==2

    def calculate_canvas_size(images, max_cols, img_width, img_height, divider_width):
        canvas_width = (img_width + divider_width) * min(len(images), max_cols) - divider_width
        canvas_height = (img_height * 2 + divider_width) 
        return canvas_width, canvas_height

    def draw_polygons(draw, polygons):
        for i, polygon in enumerate(polygons):
            color = get_color_by_id(i, len(polygons))
            draw.polygon(polygon, fill=color, outline='white')

    image_pairs = []

    for result in res2:
        img_orig = result.orig_img
        img_labeled = result.plot(labels=False, conf=False, boxes=drawBoxes_byClass, masks=drawMasks_byClass)
        img_orig_pil = Image.fromarray(img_orig)
        img_labeled_pil = Image.fromarray(img_labeled)

        if (drawMasks_byInstance):
            polygons = result.masks.xy
            #print(result.path, len(result), len(result.masks), len(result.masks.xy))
            draw_orig = ImageDraw.Draw(img_orig_pil); draw_polygons(draw_orig, polygons)          # Draw polygons on original imag
            #draw_labeled = ImageDraw.Draw(img_labeled_pil); draw_polygons(draw_labeled, polygons)  # Draw polygons on labeled image
        
        image_pairs.append((img_orig_pil, img_labeled_pil))

    if not image_pairs:
        print("No images to display.")
    else:
        img_width, img_height = image_pairs[0][0].size
        canvas_width, canvas_height = calculate_canvas_size(image_pairs, max_tile_cols, img_width, img_height, divider_width)
        canvas = Image.new('RGB', (canvas_width, canvas_height), "white")
        draw = ImageDraw.Draw(canvas)

        for i, (img_orig, img_labeled) in enumerate(image_pairs[:max_tile_cols]):
            col = i % max_tile_cols; row = i // max_tile_cols
            top_left_x = col * (img_width + divider_width); top_left_y = row * (img_height * 2 + divider_width)
            canvas.paste(img_orig, (top_left_x, top_left_y))
            canvas.paste(img_labeled, (top_left_x, top_left_y + img_height + divider_width))

        if show:
            display(canvas)
        if save_path:
            canvas.save(save_path)

def results_toDF(res2, addIDCol = False):
    data = []
    for result in res2:
        boxes = result.boxes
        inst = 0
        for box in boxes:
            x, y, w, h = box.xywh[0].tolist()
            inst += 1
            data.append({
                'Fullpath': result.path,
                'Filename' : os.path.basename(result.path),
                'Instance' : inst,
                'Class': box.cls[0].item(),
                'Conf': box.conf[0].item(),
                'x': x, 'y': y,
                'w': w, 'h': h,
                'xc' : x + w/2,
                'yc' : y + h/2,
                'Circ Area' : (0.858 * w * h)
            })
    df = pd.DataFrame(data)
    if (addIDCol): 
        df['ID'] = df.groupby('path').cumcount()
    return df

def results_toCSV(res2, save_path):
    df = results_toDF(res2)
    df.to_csv(save_path, index=False)

def polygon_area(coords):
    x = coords[:, 0]; y = coords[:, 1]
    i = np.arange(len(x))
    # 'shoelace' formula
    # return 0.5*np.abs(np.dot(x, np.roll(y, -1)) - np.dot(y, np.roll(x, -1)))
    return np.abs(np.sum(x[i-1]*y[i]-x[i]*y[i-1])*0.5)

def find_parents(classes, image):
    class0_indices = np.where(classes == 0)[0]
    class1_indices = np.where(classes == 1)[0]
    
    class1_centers = image.boxes.xywh[class1_indices]
    class0_centers = image.boxes.xywh[class0_indices]
    
    class1_parents = []
    for class1_coord in class1_centers.numpy():
                # print("the class1 instance: ", class1_coord)
                
                droplet_x = class1_coord[0]
                droplet_y = class1_coord[1]
                # print("droplet x, y coords: ", droplet_x, droplet_y)
                min_distance_from_cells = math.inf
                parent_cell_index = 0
                for i, class0_coord in enumerate(class0_centers.numpy()):
                    # print("the class0 instance: ", class0_coord)
                    
                    cell_x = class0_coord[0]
                    cell_y = class0_coord[1]
                    # print("cell x, y coords: ", cell_x, cell_y)
                    point1 = np.array((droplet_x, droplet_y))
                    point2 = np.array((cell_x, cell_y))
                    dist = np.linalg.norm(point1 - point2)
                    # print("dist from class0 instance: ", dist)
                    if dist < min_distance_from_cells:
                        min_distance_from_cells = dist
                        parent_cell_index = i
                        
                # print("parent cell index: ", parent_cell_index)        
                class1_parents.append(class0_indices[parent_cell_index])
    return class1_parents

def find_first_file(m_folder, m_contains):
    for root, dirs, files in os.walk(m_folder):
        for file in files:
            if m_contains in file:
                return os.path.join(root, file)
    return None

def create_multichannel_array(folder_path, down_x = 1, down_y = 1):
    image_arrays = []
    image_names = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg','.bmp','.tif')):
                print("loading ", file)
                file_path = os.path.join(root, file)
                t_img = Image.open(file_path).convert('L'); width, height = t_img.size  # Convert to grayscale if not already
                new_width = int(width * down_x); new_height = int(height * down_y)
                t_img = t_img.resize((new_width, new_height), Image.Resampling.BILINEAR) # Resize the image

                t_arr = np.array(t_img)
                if t_arr.ndim == 2:  # Ensure the image is grayscale
                    image_arrays.append(t_arr)
                    image_names.append(file)
    
    if not image_arrays:
        return None  # Or raise an exception if you prefer

    # Stack the arrays along a new axis to create a multi-channel array
    multi_channel_array = np.stack(image_arrays, axis=-1)
    return multi_channel_array, image_names

def get_color_by_id(point_region_id, total_ids):
    hue = point_region_id / total_ids # Scale the hue by the number of unique IDs, wrapping around the hue circle
    saturation = 0.9; value = 0.9  # Keep saturation and value high for bright colors
    rgb = colorsys.hsv_to_rgb(hue, saturation, value)
    return tuple(int(c * 255) for c in rgb) # Convert to 0-255 scale for RGB colors in PIL

def get_vor_boundaries(boxes, ImgIfDesired = None):
    points= []; vor_verts = {}
    for idx in range(len(boxes)): points.append((boxes[idx][0], boxes[idx][1]))
    vor_verts_list = []
    try:
        vor = Voronoi(points)
    except:
        return vor_verts_list
    for point_region_id, region_id in enumerate(vor.point_region): #this is needed to preserve the order
        if (-1 not in vor.regions[region_id]):
            region_vertices = vor.vertices[vor.regions[region_id]]
            vor_verts[point_region_id] = region_vertices.tolist()

    default_triangle_height = 2; default_triangle_base_length = 4
    for idx, point in enumerate(points):
        if idx in vor_verts:  # Voronoi region exists
            vor_verts_list.append(vor_verts[idx])
        else:  # Create default triangle for missing regions
            bl_vertex = (point[0] - default_triangle_base_length / 2, point[1])
            br_vertex = (point[0] + default_triangle_base_length / 2, point[1])
            top_vertex = (point[0], point[1] + default_triangle_height)  
            vor_verts_list.append([bl_vertex, br_vertex, top_vertex])
    
    if (ImgIfDesired):
        drawV = ImageDraw.Draw(ImgIfDesired)
        r = 2  # radius of the points
        for point_region_id, point in enumerate(points):
            outline_color = get_color_by_id(point_region_id, len(points))
            left_up_point = (point[0] - r, point[1] - r)
            right_down_point = (point[0] + r, point[1] + r)
            if vor_verts.get(point_region_id) and len(vor_verts[point_region_id]) > 0:
                polygon_vertices_tuples = [tuple(vertex) for vertex in vor_verts[point_region_id]]
                drawV.polygon(polygon_vertices_tuples, width=3, outline=outline_color)
            drawV.ellipse([left_up_point, right_down_point], fill=outline_color)

        display(imgV)
    return vor_verts_list

def find_mask_intensities(img_data, image_array, file_name, shift_x = 0, shift_y = 0, include_headers = True, meta_name = "NA", tile_name = "NA", max_bb_area = 99999):
    sto = io.StringIO()
    sth = ''; d = '\t'

    def bstr_h(sth1):
        nonlocal sth
        sth += sth1

    def bstr_m(st1):
        sto.write(st1)

    def bstr_m_start():
        nonlocal sth, sto
        st = sth + '\r' + sto.getvalue()
        sto.close()
        sto = io.StringIO()
        sto.write(st)

    def get_mask(vertices):
        polygon_path = mpltPath.Path(vertices) # Create a path object from the vertices
        inside_polygon = polygon_path.contains_points(class_points)
        mask = inside_polygon.reshape(xx.shape) # Reshape the mask back to the image shape
        return mask

    width =image_array.shape[1]; height = image_array.shape[0]; channels = image_array.shape[2]
    boxes = img_data.boxes.cpu()
    img_box_centers = boxes.xywh 
    img_mask_coords = None if img_data.masks is None else img_data.masks.xy
    img_vor_coords = get_vor_boundaries(img_box_centers)

    first = include_headers; masks = {}
    print("width =",width,"height =",height,"chs =",channels,"boxes =",len(img_box_centers),"vor =",len(img_vor_coords))
    xx, yy = np.meshgrid(np.arange(width),np.arange(height)) # Create a mesh grid of coordinate values
    x_flat = xx.flatten(); y_flat = yy.flatten()
    class_points = np.vstack((x_flat, y_flat)).T # Create a list of (x, y) points from the flattened grid
    for idx in range(len(img_box_centers)):
        if (idx % 250 == 0): print("Measuring Intensities",idx)
        bbox_xywh = img_box_centers[idx]
        bbox_corners = [[bbox_xywh[0] - bbox_xywh[2], bbox_xywh[1] + bbox_xywh[3]],[bbox_xywh[0] + bbox_xywh[2], bbox_xywh[1] + bbox_xywh[3]] ,[bbox_xywh[0] + bbox_xywh[2], bbox_xywh[1] - bbox_xywh[3]], [bbox_xywh[0] - bbox_xywh[2], bbox_xywh[1] - bbox_xywh[3]]]
        vor_corners = img_vor_coords[idx] if img_vor_coords else None
        polys = { "box": bbox_corners, "poly": img_mask_coords,  **({"vor": vor_corners} if vor_corners else {}) }
        masks = {key: get_mask(value) for key, value in polys.items() if value}

        if (first): bstr_h('FileName' + d + 'MetaName' + d + 'TileName' + d + 'ObjectID' + d + 'Class'                      + d + 'Confidence'                  + d + 'cx' + d + 'cy' + d)
        bstr_m(             file_name + d + meta_name  + d +  tile_name + d + str(idx)   + d + str(boxes[idx].cls.item()) + d + str(boxes[idx].conf.item()) + d + str(bbox_xywh[0].item() + shift_x) + d + str(bbox_xywh[1].item() + shift_y))

        # Look at each mask for each channel
        for c in range(channels):
            cs = str(c)
            for key in masks:
                selected_pixels = image_array[:, :, c][masks[key]]
                area = len(selected_pixels)
                if (first and c==0): bstr_h(key + ' AreaP' + d)
                if (c==0): bstr_m(               str(area) + d)

                sum = np.sum(selected_pixels)
                avg = np.average(selected_pixels)
                std = np.std(selected_pixels)
                if (first): bstr_h(key + ' Total Intensity wv' + cs + d + key + ' Avg Intensity wv' + cs + d + key + ' Std Intensity wv' + cs + d)
                bstr_m(                    str(sum)                 + d + str(avg)                       + d + str(std)                       + d)

        if (first): bstr_m_start(); first = False
        bstr_m('\r')
    return sto.getvalue()

def Predict_OnPartsOfImage(model, original_image_name, full_image_arr_predict, full_image_arr_measure = None, save_path = None, new_w = 256, new_h = 256, overlap_amount = 0, fill_edges = False, include_headers = True, meta_name = "NA", testMode = False, maxdets = 6666, minconf = 0.25, max_bb_area = 99999):
    def get_piece(t_arr, x, y):
        piece = t_arr[y:min(y + new_h, t_arr.shape[0]), x:min(x + new_w, t_arr.shape[1])] # Calculate the dimensions of the piece
        
        # normalize the tile that is going to be predicted on
        max_tile_value = np.max(piece)
        print(f"max tile value: {max_tile_value}")
        if max_tile_value > 0:
            piece_normalized = (piece / max_tile_value * 255).astype(np.uint8)
        else:
            piece_normalized = piece.astype(np.uint8)
        return piece_normalized

    save_tiles = True 
    t_arr = full_image_arr_predict
    first = include_headers
    st = io.StringIO()
    for y in range(0, t_arr.shape[0], new_h - overlap_amount):
        for x in range(0, t_arr.shape[1], new_w - overlap_amount):
            piece_pred = get_piece(t_arr, x, y)
            piece_meas = get_piece(full_image_arr_measure, x, y) if (full_image_arr_measure is not None) else piece_pred
            tilename = str(x) + "," + str(y); print("Region:",tilename)
            predictions = model.predict(piece_pred, show=False, max_det=maxdets, conf=minconf, half=True) # predict on normalized tile

            directory_path = r"S:\Phys\FIV925 XSection\Datasets\Human Lung\Infer\Prediction Tiles"
            if not os.path.exists(directory_path):
                os.makedirs(directory_path)
            save_tile_name = os.path.join(directory_path, f"{tilename}.png")
            if save_tiles:
                save_tile_results(predictions, save_tile_name, show=False)
                
            # predictions[0] seems like it's a YOLO Results object
            st.write(find_mask_intensities(predictions[0], piece_meas, original_image_name, x, y, first, meta_name, tilename, max_bb_area))
            if (testMode and not first): break # This will give us exactly two regions
            first = False
        if (testMode and not first): break

    strRet = st.getvalue()
    if (save_path is not None):
        with open(save_path, 'a') as file: file.write(strRet)
        st.close()
    print("Done with File")
    return strRet

#Note max_bb_area is not implemented, it would skip predictions on really large BoundingBoxes, that are probably abberant
def work_on_folder(model, SubFolder, PredContains, dim_x, dim_y, down_x = 1, down_y = 1, max_bb_area = 99999, min_conf = 0.2, testMode = False, IncludeHeaders = True, save_path = None, maxdet = 6000):
    file_pred = find_first_file(SubFolder, PredContains)

    if not file_pred:
        print(f"No valid files found in {SubFolder} that contain '{PredContains}'. Skipping this folder.")
        return "", []
    
    try:
        img = Image.open(file_pred).convert('RGB'); width, height = img.size
        new_width = int(width * down_x); new_height = int(height * down_y)
        img = img.resize((new_width, new_height), Image.Resampling.BILINEAR) # Resize the image
        print("Resized image from", width, height, " to", new_width, new_height)
        pred_arr = np.array(img) # Convert to array
        
        meas_arr, names = create_multichannel_array(SubFolder, down_x, down_y)
        st = Predict_OnPartsOfImage(model, file_pred, pred_arr, meas_arr, None, dim_x, dim_y, 0, False, IncludeHeaders, SubFolder, testMode, maxdet, min_conf, max_bb_area)
        if save_path is not None and st: 
            with open(save_path, 'a') as file: file.write(st)
        print("Done with Files")
        return st, names
    except Exception as e:
        print(f"Failed to process folder {SubFolder}: {e}")
        return "", []

def clear_path(mPathClear):
    if os.path.exists(mPathClear):
        for filename in os.listdir(mPathClear):
            file_path = os.path.join(mPathClear, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.remove(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print(f"Failed to delete {file_path}. Reason: {e}")
    else:
        print(f"The specified path does not exist: {mPathClear}")


In [15]:
# Juanro Lung
model_path = r"S:\Phys\FIV925 XSection\Datasets\Brain\01c\YO 432 0515 Snow\map75=0641477  pt65 idx=2 ep=4 btch=16 rnd=7684896\weights\best.pt"
m_folder = r"S:\Phys\FIV925 XSection\Datasets\Human Lung\Infer"
m_contains = "DAPI"
res_append = "2"
dim_x, dim_y = 384, 384
down_x, down_y = 0.5, 0.5
max_area = 1000
min_conf = 0.5
testMode = False
Image.MAX_IMAGE_PIXELS = None #Turn off the limit

In [18]:
model = YOLO(model_path)
first_level_subfolders = next(os.walk(m_folder))[1]  # Get first level of folders only
First = True
stio = io.StringIO()
namedict = {}

for subfolder in first_level_subfolders:
    print(subfolder,"---------------------------------")
    subfolder_path = os.path.join(m_folder, subfolder)
    st, names = work_on_folder(model, subfolder_path, m_contains, dim_x, dim_y, down_x, down_y, max_area, min_conf, testMode, First)
    if stio is not None:
        stio.write(st)
        namedict[subfolder] = names
    First = False
    if (testMode): break
 
# Save out the main data
save_path = os.path.join(m_folder, "Res00"+res_append+".txt")
strRet = stio.getvalue(); stio.close()
with open(save_path, 'a') as file: file.write(strRet)

DAPI ---------------------------------
Resized image from 21234 24097  to 10617 12048
loading  DAPI_Juanru_Lung_DAPI.tif
max tile value: 255
max tile value: 255
Region: 0,0

0: 448x448 11 1s, 21.5ms
Speed: 3.0ms preprocess, 21.5ms inference, 2.0ms postprocess per image at shape (1, 3, 448, 448)
width = 384 height = 384 chs = 1 boxes = 11 vor = 11
Measuring Intensities 0
max tile value: 255
max tile value: 255
Region: 384,0

0: 448x448 10 1s, 19.0ms
Speed: 1.5ms preprocess, 19.0ms inference, 1.0ms postprocess per image at shape (1, 3, 448, 448)
width = 384 height = 384 chs = 1 boxes = 10 vor = 10
Measuring Intensities 0
max tile value: 242
max tile value: 242
Region: 768,0

0: 448x448 5 1s, 18.6ms
Speed: 1.0ms preprocess, 18.6ms inference, 2.0ms postprocess per image at shape (1, 3, 448, 448)
width = 384 height = 384 chs = 1 boxes = 5 vor = 5
Measuring Intensities 0
max tile value: 255
max tile value: 255
Region: 1152,0

0: 448x448 7 1s, 19.0ms
Speed: 2.0ms preprocess, 19.0ms inference,