In [None]:
%matplotlib inline

In [None]:
import os
import sys
from collections import defaultdict, OrderedDict
import json

import xml.etree.ElementTree as ET
from xml.dom import minidom

import numpy as np
import pandas as pd

import openslide
from PIL import Image
from PIL import ImageDraw

py_src_code_dir = '../src/python'
sys.path.insert(0, py_src_code_dir)
from digipath_toolkit import get_sample_selection_mask, get_strided_fence_array
from digipath_toolkit import get_patch_location_array_for_image_level

data_dir = '../../DigiPath_MLTK_data'
zip_tank = '../../DigiPath_MLTK_data/zipTank/wsi_annotation_sample/'
xml_name = os.path.join(zip_tank, 'e39a8d60a56844d695e9579bce8f0335.xml')
c_lab_id_fn = os.path.join(zip_tank, 'class_label_id.csv')

im_dir = '../../DigiPath_MLTK_data/RegistrationDevData'
im_file = 'e39a8d60a56844d695e9579bce8f0335.tiff'
image_file_name = os.path.join(im_dir, im_file)

In [None]:
MINIMUM_THUMB_PATCH_DIM = 2

def get_priority_ordered_labels(label_id_priority_fname):
    """ ordered_priority_dict = get_priority_ordered_labels(label_id_priority_fname) 
    read the input .csv file into a priority dictionary struct
    
    Args:
        label_id_priority_fname:    with Header = Label, ID, Priority
        
    Returns:
        ordered_priority_dict:      {priority_number: {'label': label_str, 'ID': str_number}}
                                    sorted with largest priority number first
    """
    # create the ordered_priority_dict for return, and a id_priority reverse dict 4 lookups
    priority_tuples_list = []
    
    # read the file
    lines = ''
    try:        
        with open(label_id_priority_fname, 'r') as fh:
            lines = fh.readlines()
    except:
        print('failed opening: ', label_id_priority_fname)
        lines = ''
        pass
    
    # read the .csv lines into the dict   Header = Label, ID, Priority
    if len(lines) > 0:
        for line in lines:
            ln_list = line.strip().split(',')
            if len(ln_list) > 1 and ln_list[0] != 'Label':
                #                   Fix name clash: .xml "Id" vs .csv "ID" - Renaming csv-ID as label_ID
                # tuple:                    (    priority,     {label: label_name,     ID: ID_number}    )
                priority_tuples_list.append((int(ln_list[2]), {'label':ln_list[0],'label_ID':ln_list[1]}))
    
    return OrderedDict(sorted(priority_tuples_list, reverse=True))
    
    
def get_ordered_priority_label_coords_dict(xml_file_name, label_id_priority_fname):
    """ Usage: 
    priority_dict = get_ordered_priority_label_coords_dict(xml_file_name, label_id_priority_fname)
    parse an xml file for key fields needed for annotation selection of images
    
    Args:
        xml_file_name:              QuPath Annotation convention xml file
        label_id_priority_fname:    with columns Label, Id, Priority
        
    Returns:
        priority_dict:              python dict of dicts - with priority numbers as ordered keys,
                                        values are python dicts with:
                                            label:  Text
                                            Text:   label
                                            coords: vertices as numpy (n x 2) array [[x, y], [x, y],...]
                                            ID:     region Id number - depreciated - defined by label
                                            
    """
    # define which region keys to include
    REGION_KEYS =  {'Id': 'int', 'Text': 'str', 'Zoom': 'float', 'Analyze': 'bool'}
    INT_BOOL_DICT = {1: True, 0:False}
    
    
    # module call:          get the {priority: {label: l, ID: n}
    ordered_priority_dict = get_priority_ordered_labels(label_id_priority_fname)
    
    # read the file into text
    with open(xml_file_name, 'r') as fh:
        lines = fh.readlines()
    
    if len(lines) == 0 or ordered_priority_dict is None:
        print('\n\n\t\t\tThrow_A_Pythonic_Conniption_Fit')
        print('\t\t\tFail to read: ',xml_file_name, '\n\n')
        return ordered_priority_dict

    # create the reverse dict  { label:    priority}
    label_ID_Priority_dict = {v['label']: k for k, v in ordered_priority_dict.items()}

    # initialize region-vertex loop region-coords loop cycle variables
    reg_on = False
    v_on = False
    vertex_list = []
    region_dict = {}
    
    for line in lines:
        # region-vertex loop: skip to bottom of loop first, work back up as conditions found
        #       finds "Region", fills in keys, moves up-loop, 
        #       finds "Vertex" collects all "Vertex" tags,
        #       takes this first if ... when end of Vertices is found
        #       fills "priority" defined .csv (reverse dict label_ID_Priority_dict)
        if reg_on == True and v_on == True and line.strip() == '</Vertices>':
            
            # end of region - add region_dict to ordered_priority_dict if coords found
            if'Text' in region_dict and region_dict['Text'] in label_ID_Priority_dict and len(vertex_list) > 1:
                # priority - .csv reverse dict {label: priority} with label=="Text" found in xml
                priority = label_ID_Priority_dict[region_dict['Text']]
                for k in REGION_KEYS.keys():
                    if k in region_dict:
                        ordered_priority_dict[priority][k] = region_dict[k]
                ordered_priority_dict[priority]['vertices'] = np.array(vertex_list)
                
            else:
                #                                                       Throw Warning | Error Here?
                print('\n\n\t\t\tThrow_A_Pythonic_Conniption_Fit')
                print('\nlen(vertex_list)', len(vertex_list), '\nregion_dict\n', region_dict, '\n\n')

            # restart region-vertex loop: reset all region-coords loop cycle variables
            reg_on = False
            v_on = False
            vertex_list = []
            region_dict = {}

        elif reg_on == True and v_on == True and '<Vertex' in line.strip()[0:7]:
            # append every <vertex> in <Vertices> to the list of coords
            
            # remove the xml markup and split on empty space, find & insert the X=..., Y=... elements
            vertex_line_list = line.strip().strip('<').strip('>').strip('/').split() # .split(' ')
            xy_dict = {}
            # find
            for v in vertex_line_list:
                if v[0] == 'X':
                    kv_pair = v.split('=')
                    xy_dict['X'] = float(kv_pair[1].strip('"'))

                elif v[0] == 'Y':
                    kv_pair = v.split('=')
                    xy_dict['Y'] = float(kv_pair[1].strip('"'))

            # insert in vertex list
            if 'X' in xy_dict and 'Y' in xy_dict:
                vertex_list.append([xy_dict['X'], xy_dict['Y']])

        if reg_on == True and '<Vertices' in line:
            # set to begin parsing Vertex lines
            v_on = True

        if reg_on == False and '<Region ' in line.strip()[0:8]:
            #                       begin parsing the new region
            reg_on = True
            v_on = False        #   (with paranoia)
            region_dict = {}
            
            # parse this line ( <Region ) to get find key-value pairs named in REGION_KEYS
            region_list = line.strip().split()
            for reg_item in region_list:
                if '=' in reg_item:
                    # split into a key-value pair
                    item_list = reg_item.strip().split('=')
                    if len(item_list) == 2:
                        for k in REGION_KEYS:
                            # insert key-value pair if key is defined above in REGION_KEYS
                            if k in item_list[0][0:len(k)]:
                                if REGION_KEYS[k] == 'int':
                                    region_dict[k] = int(item_list[1].strip('"'))
                                elif REGION_KEYS[k] == 'float':
                                    region_dict[k] = float(item_list[1].strip('"'))
                                elif REGION_KEYS[k] == 'bool':
                                    region_dict[k] = INT_BOOL_DICT[int(item_list[1].strip('"'))]
                                else:
                                    region_dict[k] = item_list[1].strip('"')
    
    return ordered_priority_dict

#   get_select_bounds_from_mask
def get_select_bounds_from_mask(mask_mat, xy):
    """ Usage: start_stop_dict = get_select_bounds_from_mask(mask_mat, xy='x')
    find the first and last unmasked row (y) or col (x) in the mask image input mask_mat
    
    Args:
        mask_mat:           2d numpy binary array
        xy:                 character x for x axis or y for y axis
        
    Returns:
        start_stop_dict:    {xy+'_start': _start_, xy+'_end': _stop_}
        
    """
    # initialize
    _start_ = None
    _stop_ = None
    
    # translate input variables
    if xy == 'x':
        axis = 1
        
    elif xy == 'y':
        axis = 0
        
    # sum of axis: sum_of_rows is x, axis=1,
    sum_of_axis = mask_mat.sum(axis=axis)
    current_greater_than = 0
    for k in range(sum_of_axis.size):
        if sum_of_axis[k] > 0:
            current_greater_than = k
            if _start_ is None:
                _start_ = k
    
    # set the last row if a first row one more were found to contain ones
    if not _start_ is None and current_greater_than > _start_:
        _stop_ = current_greater_than
    
    # cover the all the way to the include all cases
    if _start_ is None:
        _start_ = 0
        
    if _stop_ is None:
        _stop_ = sum_of_axis.shape[0] #k

    return _start_, _stop_

def get_region_mask(region_coords, thumbnail_divisor, thumbnail_size): # image_dimensions):
    """ mask_im, img = get_region_mask(region_coords, thumbnail_divisor,image_dimensions) 
    fabricate a numpy array image mask for thumbnail size with region coords (vertices)
    Args:
    Returns:
    """
    # scale the region coords tuple with the thumbnail_divisor as type int
    xy_list = (region_coords / thumbnail_divisor).astype(np.int).tolist()
    xy_list = [(p[1], p[0]) for p in xy_list ]
    
    img = Image.fromarray(np.zeros((thumbnail_size[1],thumbnail_size[0])).astype(np.uint8))
    
    # make it a Pillow Draw and draw the polygon from the list of (x,y) tuples
    draw = ImageDraw.Draw(img)
    draw.polygon(xy_list, fill="white")
    
    # create the logical mask for patch selection in the return variable
    return np.array(img) > 0

In [None]:
data_dir = '../../DigiPath_MLTK_data'
output_dir = '../../DigiPath_MLTK_data/annotation_test/results'
if os.path.isdir(output_dir) == False:
    os.makedirs(output_dir)

wsi_file = 'RegistrationDevData/e39a8d60a56844d695e9579bce8f0335.tiff'
wsi_file = os.path.join(data_dir, wsi_file)
# class_label_id.csv >> class_label_id_test.csv
csv_file = 'wsi_annotation_sample/class_label_id_test.csv'
csv_file = os.path.join(data_dir, csv_file)
xml_file = 'wsi_annotation_sample/e39a8d60a56844d695e9579bce8f0335.xml'
xml_file = os.path.join(data_dir, xml_file)

run_parameters = {'method': 'annotations_to_dir', 
                  'output_dir': output_dir,
                  'wsi_filename': wsi_file, 
                  'csv_file_name': csv_file,
                  'xml_file_name': xml_file,
                  'thumbnail_divisor': 56, 
                  'patch_stride_fraction': 1.0, 
                  'image_level': 0,  
                  'patch_height': 224, 
                  'patch_width': 224, 
                  'threshold': 0, 
                  'patch_select_method': 'threshold_rgb2lab', 
                  'rgb2lab_threshold': 80}

#                                                            define the return variable
labeled_masks_dict = defaultdict(dict)

# assign local names
wsi_filename = run_parameters['wsi_filename']
csv_file_name = run_parameters['csv_file_name']
xml_file_name = run_parameters['xml_file_name']
patch_select_method = run_parameters['patch_select_method']

image_level = run_parameters['image_level']

# Stride will not scale unless thumbnail_divisor is made of factors of patch_height & patch_width
thumbnail_divisor = run_parameters['thumbnail_divisor']

patch_height = run_parameters['patch_height']
patch_width = run_parameters['patch_width']
thumb_patch_height = max(MINIMUM_THUMB_PATCH_DIM, patch_height // thumbnail_divisor)
thumb_patch_width = max(MINIMUM_THUMB_PATCH_DIM, patch_width // thumbnail_divisor)

threshold = run_parameters['threshold']


if 'patch_stride_fraction' in run_parameters:
    patch_stride = run_parameters['patch_stride_fraction']
else:
    patch_stride = 1.0
    
#                                                            image dimensions, downsamples
os_im_obj = openslide.OpenSlide(wsi_filename)
image_dimensions = os_im_obj.dimensions
obj_downsample = os_im_obj.level_downsamples[image_level]
thumbnail_size = (image_dimensions[0] // thumbnail_divisor, image_dimensions[1] // thumbnail_divisor)
small_im = os_im_obj.get_thumbnail(thumbnail_size)
os_im_obj.close()

# higher_priorities_mask = get_sample_selection_mask(small_im, patch_select_method, run_parameters=None)
mask_im = get_sample_selection_mask(small_im, patch_select_method, run_parameters=None)

print('image_dimensions:', image_dimensions, '\tthumbnail_divisor:', thumbnail_divisor)
print('thumbnail_size', thumbnail_size, '\tsmall_im.size', small_im.size, 
      '\mask_im.shape', mask_im.shape)
print('image_level', run_parameters['image_level'], '\tobj_downsample', obj_downsample, 
      'patch_height: %i, patch_width: %i'%(patch_height, patch_width))
# thumbnail_size = higher_priorities_mask.shape

priority_dict = get_ordered_priority_label_coords_dict(xml_file_name, csv_file_name)
for p, p_dict in priority_dict.items():

    label = p_dict['label']
    #                                       <0><0>~~ (else off by one) thumbnail_size=small_im.size ~~<0><0>
    this_mask = get_region_mask(p_dict['vertices'], thumbnail_divisor, thumbnail_size=small_im.size)
    # higher_priorities_mask = np.logical_and(this_mask, higher_priorities_mask)
    mask_im = np.logical_and(this_mask, mask_im)
    #if higher_priorities_mask.sum() > 0:
    if mask_im.sum() > 0:
        patch_location_array = []
        #                               May not need return these     -     Dev Nest Feathering
        p_dict['mask_im'] = mask_im # higher_priorities_mask
        p_dict['mask_individual_im'] = this_mask

        #                               patches search - limit to positive mask area
        #thumb_row_start, thumb_row_stop = get_select_bounds_from_mask(higher_priorities_mask, 'y')
        thumb_row_start, thumb_row_stop = get_select_bounds_from_mask(mask_im, 'y')
        row_start, row_stop = thumb_row_start * thumbnail_divisor, thumb_row_stop * thumbnail_divisor
        
        #thumb_col_start, thumb_col_stop = get_select_bounds_from_mask(higher_priorities_mask, 'x')
        thumb_col_start, thumb_col_stop = get_select_bounds_from_mask(mask_im, 'x')
        col_start, col_stop = thumb_col_start * thumbnail_divisor, thumb_col_stop * thumbnail_divisor
        
        #                               May not need return these     -     Bug Nest Feathering
        #p_dict['row_start'] = row_start;                        p_dict['row_stop'] = row_stop
        #p_dict['col_start'] = col_start;                        p_dict['col_stop'] = col_stop
        
        #                               array step scales to image-level downsample
        scale_patch_height = patch_height
        rows_fence_array = get_strided_fence_array(scale_patch_height, patch_stride, row_start, row_stop)
        
        scale_patch_width = patch_width
        cols_fence_array = get_strided_fence_array(scale_patch_width, patch_stride, col_start, col_stop)
        
        #                               May not need return these     -     Dev Nest Feathering
        p_dict['rows_fence_array'] = rows_fence_array[:,0]
        p_dict['cols_fence_array'] = cols_fence_array[:,0]

        
        #                                       zip & rescale Fence Arrays to mask image size
        #                   iterator for rows:  (top_row, bottom_row, full_scale_row_number)
        it_rows = zip(rows_fence_array[:, 0] // thumbnail_divisor,
                      rows_fence_array[:, 1] // thumbnail_divisor,
                      rows_fence_array[:, 0])

        #                   loop variables for iterator for cols
        lft_cols = cols_fence_array[:, 0] // thumbnail_divisor
        rgt_cols = cols_fence_array[:, 1] // thumbnail_divisor
        cols_array = cols_fence_array[:, 0]

        for tmb_row_top, tmb_row_bot, row_n in it_rows:
            #               iterator for cols:  (left_column, right_column, full_scale_column_number)
            it_cols = zip(lft_cols, rgt_cols, cols_array)

            for tmb_col_lft, tmb_col_rgt, col_n in it_cols:
        
                # select this patch if the selected patch pixels add to more than threshold (default = 0)
                if (mask_im[tmb_row_top:tmb_row_bot, tmb_col_lft:tmb_col_rgt]).sum() > threshold:

                    #       add the image level scale row and column of the upper left corner to the list
                    patch_location_array.append((col_n, row_n))
        
        p_dict['patch_location_array'] = patch_location_array
        labeled_masks_dict[p] = p_dict

skip_list = ['vertices', 'Id', 'label_ID', 'Text', 'Zoom', 'Analyze', 'patch_location_array']
for p, d in labeled_masks_dict.items():
    print('\nPriority: ', p)
    for k, v in d.items():
        if not k in skip_list:
            if isinstance(v, np.ndarray):
                if len(v.shape) == 2:
                    print('%20s'%(k), v.shape, 
                          'x = (min=%0.2f, max=%0.2f) '%(v[:,0].min(), v[:,0].max()), 
                          'y = (min=%0.2f, max=%0.2f) '%(v[:,1].min(), v[:,1].max()))
                else:
                    print('%20s'%(k), v.shape, '(min=%0.2f, max=%0.2f) '%(v.min(), v.max()))
            else:
                print('%20s: %s'%(k, v))

In [None]:
print('image_dimensions', image_dimensions, '\tthumbnail_divisor', thumbnail_divisor)
print('thumbnail_size', thumbnail_size, '\tsmall_im.size', small_im.size, 
      '\mask_im.shape', mask_im.shape)
print('image_level', run_parameters['image_level'], '\tobj_downsample', obj_downsample, 
      'patch_height: %i, patch_width: %i'%(patch_height, patch_width))
p = 7
d = labeled_masks_dict[p]

im_here = Image.fromarray((d['mask_im'].astype(np.uint8) * 255))
display(im_here)

im_indiv_here = Image.fromarray((d['mask_individual_im'].astype(np.uint8) * 255))
display(im_indiv_here)

In [None]:
w = image_dimensions[0];                   h = image_dimensions[1]
skip_list_here = ['vertices', 'Id', 'label_ID', 'Text', 'Zoom', 'Analyze', 'mask_im', 'mask_individual_im']
skip_list_here.append('patch_location_array')
start_stop_list = ['row_start', 'row_stop', 'col_start', 'col_stop']
p = 6
d = labeled_masks_dict[p]
for k, v in d.items():
    if not k in skip_list_here:
        if isinstance(v, np.ndarray):
            if len(v.shape) == 2:
                print('%20s'%(k), v.shape, 
                      'x = (min=%0.3f, max=%0.3f) '%(v[:,0].min(), v[:,0].max()), 
                      'y = (min=%0.3f, max=%0.3f) '%(v[:,1].min(), v[:,1].max()))
            elif k == 'rows_fence_array':
                print('%20s'%(k), v.shape, '(min=%0.2f, max=%0.2f) '%(v.min() / h, v.max() / h))
            elif k == 'cols_fence_array':
                print('%20s'%(k), v.shape, '(min=%0.2f, max=%0.2f) '%(v.min() / w, v.max() / w))
        else:
            if k in start_stop_list and 'row' in k:
                print('%20s: %s'%(k, v / h))
            elif k in start_stop_list and 'col' in k:
                print('%20s: %s'%(k, v / w))
            else:
                print('%20s: %s'%(k, v))
print()
im_here = Image.fromarray((d['mask_im'].astype(np.uint8) * 255))
display(im_here)
# mask_individual_im
im_indiv_here = Image.fromarray((d['mask_individual_im'].astype(np.uint8) * 255))
display(im_indiv_here)


## Do Step One: get the priority-labeled-Vertexes dictionary from the input files
### show usage of *get_ordered_priority_label_coords_dict(xml_file, label_priority_file)*

In [None]:
# xml_file_name = os.path.join(zip_tank, 'e39a8d60a56844d695e9579bce8f0335.xml')
# label_id_priority_fname = os.path.join(zip_tank, 'class_label_id_test.csv')
xml_file_name = os.path.join(data_dir, 'wsi_annotation_sample/e39a8d60a56844d695e9579bce8f0335.xml')
label_id_priority_fname = 'wsi_annotation_sample/class_label_id_test.csv'
label_id_priority_fname = os.path.join(data_dir, label_id_priority_fname)


priority_dict = get_ordered_priority_label_coords_dict(xml_file_name, label_id_priority_fname)
print('\npriority_dict\n')
if len(priority_dict) > 0:
    for k, v in priority_dict.items():
        s = '%i '%(k)
        for nm, vl in v.items():
            if nm == 'vertices':
                s += '\tvertices: %4i'%(len(vl))
            elif nm == 'Id':
                s += '%3s: %s'%(nm,vl)
            elif nm in ['label', 'Text']:
                s += '%6s: %9s'%(nm,vl)
            else:
                s += '%9s: %s'%(nm,vl)
        print(s)
else:
    print('de Nada')

```text
# %load ../../DigiPath_MLTK_data/wsi_annotation_sample/class_label_id_test.csv
Label,ID,Priority
null,0,4
fat,1,1
lymph,2,2
Region,3,3
malignant,4,5
offset,5,7
ink,6,6
normal,7,0
```