In [None]:
!apt update && apt install -y openslide-tools
!pip install openslide-python
!pip install tiatoolbox
!pip install histomicstk --find-links https://girder.github.io/large_image_wheels
!pip install tissueloc==2.1.0

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%cd '/content/drive/My Drive/2_PHDWork/1_Objective-I/'

/content/drive/My Drive/2_PHDWork/1_Objective-I


In [None]:
from openslide import OpenSlide
# Ostu tissue mask generator.
from tiatoolbox.tools.tissuemask import OtsuTissueMasker
# Histomics tissue mask generator.
from histomicstk.saliency.tissue_detection import get_tissue_mask
# TissueLoc tissue mask generator.
import tissueloc as tl
import os
import matplotlib.pyplot as plt
import numpy as np
import cv2 as cv
import time
import csv
import math

In [None]:
"""
The global variable used for selecting the basic mask generation method.
"""
segmentation_method = 'TissueLoc'

In [None]:
"""
The path and name of the log file to which comparison data is writted.

Note: Set it to appropriate filepath and set the 'enable_log' to True for
      writing comparison data.
"""
log_file_name = './log_data_full_image_new.csv'

"""
Writes the comparison data to a csv file.

Parameters:
     log_data: the comparison data to be written.

Returns:
     None
"""
def log_comparison_data(log_data):
  fields = ['dataset','f_name','scaling','scale','seg_method','tmb_msk_tim',
            'l_width','l_height','h_width', 'h_height','l_cntr_count',
            'h_cntr_count','cntr_trans_time', 'msk_gen_time','mth_msk_gen_time',
            'accuracy','dice_p','jaccard_p', 'dice_n','jaccard_n']

  # Creating a log file if it does not exist.
  if(not os.path.isfile(log_file_name)):
    with open(log_file_name, 'w') as csvfile:
      # creating a csv writer object
      csvwriter = csv.writer(csvfile)
      # writing the fields
      csvwriter.writerow(fields)

  # Writing comparison data to the file.
  try:
    with open(log_file_name,"a") as csvFile:
      writer = csv.DictWriter(csvFile, fieldnames=fields)
      writer.writerow(log_data)
  except IOError:
    print("Writing of Training data failed")

  return

In [None]:
"""
Accepts the image and generates its tissue mask by the proposed method.

Parameters:
     thum_nail: numpy array denoting the image for which mask has to be
                generated.

     method:    the method to be used for generation.

Returns:
     bin_mask:  the tissue mask.
"""
def get_image_tissue_mask(thum_nail,method='Otsu'):
  bin_mask = None

  if(method == "Otsu"):
    t_size = thum_nail.shape
    thum_nail = np.reshape(thum_nail,(1,t_size[0],t_size[1],t_size[2]))
    masker = OtsuTissueMasker()
    masker.fit(thum_nail)
    masks = masker.transform(thum_nail)
    true_index = np.asarray(masks[0] == True).nonzero()
    bin_mask = np.zeros(masks[0].shape,dtype=np.uint8)
    bin_mask[true_index] = 255
    thum_nail = np.reshape(thum_nail,(t_size[0],t_size[1],t_size[2]))

  elif(method == "Histomics"):
    thm_nail = cv.cvtColor(thum_nail,cv.COLOR_RGBA2RGB)
    bin_mask, _ = get_tissue_mask(thm_nail, deconvolve_first=False,
                                  n_thresholding_steps=1, sigma=0., min_size=30)
    true_index = np.asarray(bin_mask > 0).nonzero()
    bin_mask[:,:] = 0
    bin_mask[true_index] = 255
    bin_mask = bin_mask.astype(np.uint8)

  elif(method == 'TissueLoc'):
    thm_nail = cv.cvtColor(thum_nail,cv.COLOR_RGBA2RGB)
    thm_nail = tl.rgb2gray(thm_nail)
    thm_nail = tl.thresh_slide(thm_nail, 0.80, sigma=13)
    thm_nail = tl.fill_tissue_holes(thm_nail)
    thm_nail = tl.remove_small_tissue(thm_nail, 1)
    cnts = tl.find_tissue_cnts(thm_nail)
    bin_mask = np.zeros(thm_nail.shape,dtype=np.uint8)
    bin_mask = cv.fillPoly(bin_mask, cnts, (255))

  else:
    print('Error: No segmentation method specified.')

  return bin_mask

In [None]:
"""
Reads the thumnail image of the given wsi file, performs tissue segmentation and
converts the mask into boundary coordinates.

Parameters:
     wsifilename: the name of the wsi file with path for which the prediction
                  has to be performed.

     display:     if true displays the processed images.

Returns:
     contour:     the boundary co-ordinates.
     shp:         the row and col count of the thumbnail.
"""
def get_thumbnail_cordinates(wsifilename,display=False):
  wsi = OpenSlide(wsifilename)
  level = wsi.level_count-1
  thum_nail = np.array(wsi.read_region((0,0),level,wsi.level_dimensions[level]))
  shp = (thum_nail.shape[0],thum_nail.shape[1])

  start_time = time.process_time()
  bin_mask = get_image_tissue_mask(thum_nail,method=segmentation_method)
  contours, hierarchy = cv.findContours(bin_mask, cv.RETR_EXTERNAL,
                                        cv.CHAIN_APPROX_NONE)
  end_time = time.process_time()
  elapsed_time = (end_time - start_time)

  if (display):
    print("----------------------WSI Details----------------------------------")
    #print('Name:             ',name)
    print('Level Count:      ',wsi.level_count)
    print('Dimensions:       ',wsi.level_dimensions)
    print('Downscale Factors:',wsi.level_downsamples)
    plt.figure(figsize=(9,6))
    plt.subplot(1,3,1)
    plt.title('Thumbnail')
    plt.imshow(thum_nail[0])
    # plt.subplot(1,5,2)
    # plt.title("Mask")
    # plt.imshow(masks[0])
    plt.subplot(1,3,2)
    plt.title("Binary Mask")
    plt.imshow(bin_mask)
    img = np.zeros(bin_mask.shape,dtype=np.uint8)
    cv.drawContours(img, contours, -1, 255, 3)
    plt.subplot(1,3,3)
    plt.title("Boundary Generated")
    plt.imshow(img)
    plt.show()
    print("-------------------------------------------------------------------")

  return (shp,contours,elapsed_time)

In [None]:
"""
Reads the shape and the scaling factor of high resolution image at a specified
level of the given wsi file.

Parameters:
     wsifilename: the name of the wsi file with path for which the prediction
                  has to be performed.

     level:       the high resolution level from which the details has to be
                  read.

     display:     if true displays the processed images.

Returns:
     hrs_shp:     the row and col count of the high resolution image.
     s_r:         the row scale factor.
     s_c:         the col scale factor.
"""
def get_higrs_image_details(wsifilename,level=1,display=False):
  wsi = OpenSlide(wsifilename)
  thum_level = wsi.level_count-1

  thumb_shp = (wsi.level_dimensions[thum_level][1],
               wsi.level_dimensions[thum_level][0])
  hrs_shp = (wsi.level_dimensions[level][1],wsi.level_dimensions[level][0])

  s_r = hrs_shp[0] / thumb_shp[0]
  s_c = hrs_shp[1] / thumb_shp[1]

  if(display):
    print('High Resolution Image Dim: ',hrs_shp)
    print('Low Resolution Image Dim : ',thumb_shp)
    print('Row Scale Factor         : ',s_r)
    print('Col Scale Factor         : ',s_c)

  return (hrs_shp,s_r,s_c)

In [None]:
"""
Transform the contour points of the low resolution image to high resolutio based
on the developed equation.

Parameters:
     transform_request: the details required for the transformation equation.

Returns:
     transform_points:  the transformed points.

     elapsed_time:      the time is seconds required for generating the new
                        points
"""
def transform_contour_points(transform_request):
  tot = len(transform_request)
  transform_points = []

  for i in range(tot):
    start_time = time.process_time()
    item = transform_request[i]
    s_cent = item[0]  #The centroid of the low resolution image.
    l_cent = item[1]  #The centroid of the high resolution image.
    cot_points = item[2]
    s_r = item[3][0]  #The row downscale factor.
    s_c = item[3][1]  #The col downscale factor.
    new_points = []

    for pnts in cot_points:
      # Step-1: Converting the points to float for performing mathematical operation.
      wrk_pnts = pnts.astype(np.float32)

      # Step-2: Shifting the points to align centroid with the origin.
      wrk_pnts[:,:,0] = wrk_pnts[:,:,0] - s_cent[0] #Row Co-ordinates.
      wrk_pnts[:,:,1] = wrk_pnts[:,:,1] - s_cent[1] #Col Co-ordinates.

      # Step-3: Determining the negative points for use in the future.
      xa_posi_index = np.asarray(wrk_pnts[:,:,0] < 0).nonzero()
      ya_posi_index = np.asarray(wrk_pnts[:,:,1] < 0).nonzero()

      # Step-4: Squaring element-wise and multiplying it with the square of the scale.
      wrk_pnts = np.square(wrk_pnts)
      wrk_pnts[:,:,0] = wrk_pnts[:,:,0] * s_r * s_r
      wrk_pnts[:,:,1] = wrk_pnts[:,:,1] * s_c * s_c

      #Step-5: Taking square root
      wrk_pnts = np.sqrt(wrk_pnts)

      #Step-6: Re-assigning negative sign
      wrk_pnts[xa_posi_index[0],xa_posi_index[1],0] = wrk_pnts[xa_posi_index[0],xa_posi_index[1],0] * -1
      wrk_pnts[ya_posi_index[0],ya_posi_index[1],1] = wrk_pnts[ya_posi_index[0],ya_posi_index[1],1] * -1

      #Step-7: Re-shifting
      wrk_pnts[:,:,0] = wrk_pnts[:,:,0] + l_cent[0]
      wrk_pnts[:,:,1] = wrk_pnts[:,:,1] + l_cent[1]

      #Step-8: Finding the floor
      wrk_pnts = (np.floor(wrk_pnts)).astype(np.int32)

      new_points.append(wrk_pnts)

    transform_points.append(tuple(new_points))
    end_time = time.process_time()
    elapsed_time = (end_time - start_time)
  return (transform_points,elapsed_time)

In [None]:
"""
Performs a comparison of the transformed cordinates with the expected one and
displays the different images.

Parameters:
     wsifilename: the name of the wsi file with path for which the prediction
                  has to be performed.

     old_points:  the small resolution image contour cordinates.

     new_points:  the transformed cordinates generated with respect to the high
                  resolution image.

     log_data:    the dictionary to which the comparison data are to be
                  appended.

     level:       the level of the image on which the prediction has to be
                  performed.

     scaling:     if this is set to 'manual', the thumbnail image is considered
                  as the high resolution image for prediction and for predicting
                  it downscaled by a scale factore given as scale parameter.

     scale:       the downscale factor that is used when scaling = 'manual'.

Returns:
     log_data:    the measurement data generated as part of the comaprison.
"""
def compare_new_points(wsifilename,old_points,new_points,method_time,log_data,
                       level=1,scaling='auto', scale=0.5):
  wsi = OpenSlide(wsifilename)
  thum_level = wsi.level_count-1
  if(scaling == "manual"):
    hig_res = np.array(wsi.read_region((0,0),thum_level,
                                       wsi.level_dimensions[thum_level]))
    thum_nail = cv.resize(hig_res,None,fx=scale,fy=scale)
    thumb_shp = (thum_nail.shape[0],thum_nail.shape[1])
    hrs_shp = (wsi.level_dimensions[thum_level][1],
               wsi.level_dimensions[thum_level][0])

  else:
    thumb_shp = (wsi.level_dimensions[thum_level][1],
                 wsi.level_dimensions[thum_level][0])
    hrs_shp = (wsi.level_dimensions[level][1],wsi.level_dimensions[level][0])

    thum_nail = np.array(wsi.read_region((0,0),thum_level,
                                         wsi.level_dimensions[thum_level]))
    hig_res = np.array(wsi.read_region((0,0),level,
                                       wsi.level_dimensions[level]))

  start_time = time.process_time()
  bin_mask = get_image_tissue_mask(hig_res,method=segmentation_method)
  end_time = time.process_time()
  elapsed_time = (end_time - start_time)

  s_img2 = np.zeros(hrs_shp,dtype=np.uint8)
  start_time = time.process_time()
  s_img2 = cv.fillPoly(s_img2, new_points, (255))
  end_time = time.process_time()
  mask_gen_time = (end_time - start_time)

  accuracy,dice_p,jaccard_p,dice_n,jaccard_n = measure_accuracy(bin_mask,s_img2)

  log_data['l_width'] = thumb_shp[1]
  log_data['l_height'] = thumb_shp[0]
  log_data['h_width'] = hrs_shp[1]
  log_data['h_height'] = hrs_shp[0]
  log_data['l_cntr_count'] = len(old_points)
  log_data['h_cntr_count'] = len(new_points)
  log_data['cntr_trans_time'] = (method_time * 10**3)
  log_data['msk_gen_time'] = (mask_gen_time * 10**3)
  log_data['mth_msk_gen_time'] = (elapsed_time * 10**3)
  log_data['accuracy'] = accuracy
  log_data['dice_p'] = dice_p
  log_data['jaccard_p'] = jaccard_p
  log_data['dice_n'] = dice_n
  log_data['jaccard_n'] = jaccard_n

  print("----------------------Transformation Details-----------------------")
  print("Low Resolution Dimension            : ",thumb_shp)
  print("High Resolution Dimension           : ",hrs_shp)
  print("Low resolution Image Contour Length : ",len(old_points))
  print("Transform Contour Length            : ",len(new_points))
  print("Time Taken by "+segmentation_method+" Segmentation     : ",
                                            elapsed_time * 10**3,"ms")
  print("Time Taken for contour generation   : ",method_time * 10**3,"ms")
  print("Time Taken for mask generation      : ",mask_gen_time * 10**3,"ms")
  print("Reconstruction accuarcy             : ",accuracy)
  print("Dice Coefficients (Tissue Pixels)   : ",dice_p)
  print("Jaccard's Coefficients (Tisse)      : ",jaccard_p)
  print("Dice Coefficients (Non-Tissue)      : ",dice_n)
  print("Jaccard's Coefficients (Non-Tisse)  : ",jaccard_n)

  plt.figure(figsize=(16,6))
  plt.subplot(1,4,1)
  plt.title("Original Large Image.")
  plt.imshow(hig_res)
  plt.subplot(1,4,2)
  plt.title("Original Thumbnail Image.")
  plt.imshow(thum_nail)
  plt.subplot(1,4,3)
  plt.title("Tissue Mask (" + segmentation_method +" Method).")
  plt.imshow(bin_mask)
  plt.subplot(1,4,4)
  plt.title("Tissue Mask (Proposed Method).")
  plt.imshow(s_img2)
  plt.show()
  print("-------------------------------------------------------------------")

  return log_data

"""
Performs a pixel to pixel matching between the given masks and returns the
accuracy, Dice's and Jaccard's coefficient.

Parameters:
     org:     the original labels.

     img2:    the predicted labels.

Returns:
     accuracy:    the percentage of pixels that matches.

     dice_p:      the Dice's coefficients for tissue pixels.

     jaccard_p:   the Jaccard's coefficients for tissue pixels.

     dice_n:      the Dice's coefficients for non-tissue pixels.

     jaccard_n:   the Jaccard's coefficients for non-tissue pixels.

"""
def measure_accuracy(org,pred):
  # Converting the values into 0 and 1.
  i1 = org // 255
  i2 = pred // 255

  # Calculating the TP, TN, FP and FN
  TP = np.sum((i1 * i2))
  TN = np.sum((1 - i1) * (1 - i2))
  FP = np.sum((((i2 - i1) + 1)//2))
  FN = np.sum((((i1 - i2) + 1)//2))

  accuracy = round(((TP + TN)/(TP + TN + FP + FN)),2) * 100
  dice_p = round(((2 * TP)/((2 * TP) + FP + FN)),2)
  jaccard_p = round((TP /(TP + FP + FN)),2)
  dice_n = round(((2 * TN)/((2 * TN) + FP + FN)),2)
  jaccard_n = round((TN /(TN + FP + FN)),2)

  if(math.isnan(accuracy)):
    accuracy = 0
  if(math.isnan(dice_p)):
    dice_p = 0
  if(math.isnan(jaccard_p)):
    jaccard_p = 0
  if(math.isnan(dice_n)):
    dice_n = 0
  if(math.isnan(jaccard_n)):
    jaccard_n = 0

  return (accuracy,dice_p,jaccard_p,dice_n,jaccard_n)

In [None]:
"""
Performs the generation of low resolution images if the scaling option is
selected as 'manual'. It reads the thumbnail image and perform its scaling by a
factor of specified in 'scale'.

Parameters:
     wsifilename: the name of the wsi file with path for which the prediction
                  has to be performed.

     scale:       the downscale factor that is used when scaling = 'manual'.

 Returns:
     tshape:      the size (rows and cols) of the downscale image.
     contours:    the contour cordinates of the downscale image.
     h_shape:     the size (rows and cols) of the thumbnail (high resolution)
                  image.
     s_r:         the row scaling factor.
     s_c:         the col scaling factor.

"""
def generate_manual_lowres_image(wsifilename,scale):
  wsi = OpenSlide(wsifilename)
  level = wsi.level_count-1
  thum_nail = np.array(wsi.read_region((0,0),level,wsi.level_dimensions[level]))

  s_img = cv.resize(thum_nail,None,fx=scale,fy=scale)
  start_time = time.process_time()
  bin_mask = get_image_tissue_mask(s_img,method=segmentation_method)
  contours, hierarchy = cv.findContours(bin_mask, cv.RETR_EXTERNAL,
                                        cv.CHAIN_APPROX_NONE)
  end_time = time.process_time()
  elapsed_time = (end_time - start_time)

  t_shape = (s_img.shape[0],s_img.shape[1])
  h_shape = (thum_nail.shape[0],thum_nail.shape[1])
  s_r = h_shape[0]/t_shape[0]
  s_c = h_shape[1]/t_shape[1]

  return (t_shape,contours,h_shape,s_r,s_c,elapsed_time)

In [None]:
"""
Performs the co-ordinate prediction of the tissue area from the thumbnail
image of the given level image.

Parameters:
     wsifilename: the name of the wsi file with path for which the prediction
                  has to be performed.

     level:       the level of the image on which the prediction has to be
                  performed.

     scaling:     if this is set to 'manual', the thumbnail image is considered
                  as the high resolution image for prediction and for predicting
                  it downscaled by a scale factore given as scale parameter.

     scale:       the downscale factor that is used when scaling = 'manual'.

 Returns:
    thumb_nail_cor: the contour co-ordinates for the low resolution image.

    new_points[0]:  the contour co-ordinates for the high resolution image.

    method_time:    the time taken by the proposed method to create the points.
"""
def predict_cordinates(wsifilename,level=1,scaling='auto',scale=0.5):
  trans_req = []
  if(scaling == "manual"):
    t_shape, thumb_nail_cor, h_shape, s_r, s_c,tmb_msk_tim = generate_manual_lowres_image(
                                                             wsifilename, scale)
  else:
    t_shape, thumb_nail_cor,tmb_msk_tim = get_thumbnail_cordinates(wsifilename,
                                                                  display=False)
    h_shape, s_r, s_c = get_higrs_image_details(wsifilename,level=level,
                                                display=False)
  l_cent = (h_shape[0]/2,h_shape[1]/2)
  s_cent = (t_shape[0]/2,t_shape[1]/2)
  sample = [s_cent,l_cent,thumb_nail_cor,(s_r,s_c)]
  trans_req.append(sample)
  new_points,method_time = transform_contour_points(trans_req)

  return (thumb_nail_cor,new_points[0],method_time,tmb_msk_tim)


In [None]:
"""
The entry level function to be invoked to evaluate the working of the proposed
method.

Parameters:
     path:    the wsi file name or the directory path containing the wsi files
              on which the method has to be applied.

     dataset: the identifier to recognize the dataset used in loging data.

     level:   the level of the image on which the prediction has to be performed.

     scaling: if this is set to 'manual', the thumbnail image is considered as
              the high resolution image for prediction and for predicting it
              downscaled by a scale factore given as scale parameter.

     scale:   the downscale factor that is used when scaling = 'manual'.

     enable_log: the flag variable for controlling data log. Set it to True to
                 log comparison data

 Returns:
     None

 Functioning:
    Step-1 : If a director path is passed, the files within it is read
             one-by-one for processing. Ensure that the directory contains only
             wsi files.

    Step-2 : The method 'predict_cordinates' is invoked on each read wsi file
             which; based on the parameters passed generates the contour points
             for a selected low and high resolution images.

    Step-3 : Once the contour points are available, the method
             'compare_new_points' is invoked to compare the performance of the
             proposed method.
"""
def main_fun(path,dataset,level = 1,scaling = "auto",scale = 0.5,
             enable_log = True):

  if(os.path.isdir(path)):
    files_names = os.listdir(path)

    for file in files_names:
      print("\tProcessing WSI File: ",file)
      wsiname = path + file
      log_data = {}
      log_data['dataset'] = dataset
      log_data['f_name'] = file
      log_data['scaling'] = scaling
      log_data['scale'] = scale
      log_data['seg_method'] = segmentation_method
      low_res_cor,new_points,method_time,tmb_msk_tim = predict_cordinates(wsiname,
                                                              level=level,
                                                              scaling=scaling,
                                                              scale=scale)
      log_data['tmb_msk_tim'] = (tmb_msk_tim * 10**3)
      log_data = compare_new_points(wsiname,low_res_cor,new_points,method_time,
                                    log_data,level=level,scaling=scaling,
                                    scale=scale)

      if(enable_log):
        log_comparison_data(log_data)

  else:
    print("\tProcessing WSI File: ",path)
    name = os.path.basename(path)
    log_data = {}
    log_data['dataset'] = dataset
    log_data['f_name'] = name
    log_data['scaling'] = scaling
    log_data['scale'] = scale
    log_data['seg_method'] = segmentation_method
    low_res_cor,new_points,method_time,tmb_msk_tim = predict_cordinates(path,
                                                    level=level,scaling=scaling,
                                                    scale=scale)
    log_data['tmb_msk_tim'] = (tmb_msk_tim * 10**3)
    log_data = compare_new_points(path,low_res_cor,new_points,method_time,
                                  log_data,level=level,scaling=scaling,
                                  scale=scale)

    if(enable_log):
        log_comparison_data(log_data)

  return

main_fun("./0_Dataset/2_ICIAR2018/",'ICIAR2018',level = 1,scaling = "auto")

# main_fun("./0_Dataset/1_CPTAC-CM/",'Variants',level = 1,scaling = "manual",
#          enable_log = False)