In [1]:
import openslide
from openslide.deepzoom import DeepZoomGenerator
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import cv2

from lxml import etree

from pathlib import Path   #搞定
from PIL import Image
Image.MAX_IMAGE_PIXELS = None
import os
import gc
import time

ImportError: DLL load failed while importing etree: 找不到指定的程序。

# 函数

In [2]:
def get_slide(slide_path):
    slide = openslide.OpenSlide(slide_path)
    return slide

def AnnotationParser(path):  #提取xml中各个anonotation的坐标信息
    assert Path(path).exists(), "This annotation file does not exist."
    tree = etree.parse(path)
    annotations = tree.xpath("/ASAP_Annotations/Annotations/Annotation")   # 返回的是一个element
    annotation_groups = tree.xpath("/ASAP_Annotations/AnnotationGroups/Group")
    classes = [group.attrib["Name"] for group in annotation_groups]
    def read_mask_coord(cls):  #读取要mask的部分的坐标信息
        for annotation in annotations:
            if annotation.attrib["PartOfGroup"] == cls:   #这里cls为tumor，fat
                contour = []
                for coord in annotation.xpath("Coordinates/Coordinate"):  #坐标的位置信息，在x，y属性以tuple保存在contour
                    x = np.float(coord.attrib["X"])
                    y = np.float(coord.attrib["Y"])
                    contour.append([round(float(x)),round(float(y))])
                #mask_coords[cls].extend(contour)
                mask_coords[cls].append(contour)
    def read_mask_coords(classes):
        for cls in classes:
            read_mask_coord(cls)   #?是不是应该返回mask coords？########
        return mask_coords            
    mask_coords = {} #一个dict，以cls：contour的形式保存坐标信息，其中contuor是一个以tuple为元素的列表
    for cls in classes:
        mask_coords[cls] = []  #初始化各个cls的contuor信息为空列表
    mask_coords = read_mask_coords(classes) #classes为列表，包含各个cls
    return mask_coords,classes
def Annotation(slide,path,save_path=None,rule=False,save=False):
    #wsi_height = slide.wsi_height
    #wsi_width = slide.wsi_width
    
    wsi_width,wsi_height = slide.level_dimensions[0] #第一个level中x，y方向上分别对应的像素数量
    masks = {} #mask的dict，储存不同cls的contour
    contours = {}
    mask_coords, classes = AnnotationParser(path)
    
    def base_mask(cls,wsi_height,wsi_width):
        masks[cls] = np.zeros((wsi_height,wsi_width),dtype=np.uint8)
    def base_masks(wsi_height,wsi_width):
        for cls in classes:
            base_mask(cls,wsi_height,wsi_width)
        return masks
    def main_masks(classes,mask_coords,masks):
        for cls in classes:
            contours = np.array(mask_coords[cls])
            #contours = mask_coords[cls]
            for contour in contours:
                #print(f"cls:{cls},\ncontour:{contour},\ntype:{type(contour)}")
                masks[cls] = cv2.drawContours(masks[cls],[np.int32(contour)],0,True,thickness=cv2.FILLED)
        return masks
   # def make_masks()
    def export_mask(save_path,cls):
        assert Path(save_path).is_dir()
        cv2.imwrite(str(Path(save_path)/"{}.tiff".format(cls)),masks[cls],(cv2.IMWRITE_PXM_BINARY,1))
    def export_masks(save_path):
        for cls in masks.keys():
            export_mask(save_path,cls)
    def exclude_masks(masks,rule,classes):
        #masks_exclude = masks.copy()
        masks_exclude = masks
        for cls in classes:
            for exclude in rule[cls]["excludes"]:
                if exclude in masks:
                    overlap_area = cv2.bitwise_and(masks[cls],masks[exclude])
                    masks_exclude[cls] = cv2.bitwise_xor(masks[cls],overlap_area)
        #masks = masks_exclude
        return masks_exclude
                    
    masks = base_masks(wsi_height,wsi_width)
    masks = main_masks(classes,mask_coords,masks)
    if rule:
        classes = list(set(classes) & set(rule.keys()))
        masks = exclude_masks(masks,rule,classes)
        #include_masks(rule)
        #exclude_masks(rule)
    if save:
        export_masks(save_path)
    if "artifact" not in classes:
        masks["artifact"] = np.zeros((wsi_height,wsi_width),dtype=np.uint8)
    if "mark" not in classes:
        masks["mark"] = np.zeros((wsi_height,wsi_width),dtype=np.uint8)
    return masks 

def show_thumb_mask(mask,size=512):  #将单个mask resize
    #mask = masks[cls]
    height, width = mask.shape
    scale = max(size / height, size / width)
    mask_resized = cv2.resize(mask, dsize=None, fx=scale, fy=scale)
    mask_scaled = mask_resized * 255
    plt.imshow(mask_scaled)
    return mask_scaled

def get_mask_slide(masks):
    tumor_slide = openslide.ImageSlide(Image.fromarray(masks["tumors"]))
    mark_slide = openslide.ImageSlide(Image.fromarray(masks["mark"]))
    arti_slide = openslide.ImageSlide(Image.fromarray(masks["artifact"]))
    return tumor_slide,mark_slide,arti_slide

def get_tiles(slide,tumor_slide,mark_slide,arti_slide,tile_size=512,overlap=False,limit_bounds=False):
    slide_tiles = DeepZoomGenerator(slide,tile_size,overlap=overlap,limit_bounds=limit_bounds)
    tumor_tiles = DeepZoomGenerator(tumor_slide,tile_size,overlap=overlap,limit_bounds=limit_bounds)
    mark_tiles = DeepZoomGenerator(mark_slide,tile_size,overlap=overlap,limit_bounds=limit_bounds)
    arti_tiles = DeepZoomGenerator(arti_slide,tile_size,overlap=overlap,limit_bounds=limit_bounds)
    return slide_tiles,tumor_tiles,mark_tiles,arti_tiles


def remove_arti_and_mask(slide_tile,tumor_tile,mark_tile,arti_tile):
    slide_tile = np.array(slide_tile)
    tumor_tile = np.array(tumor_tile)
    mark_tile = np.where(np.array(mark_tile)==0,1,0)
    arti_tile = np.where(np.array(arti_tile)==0,1,0)
    x = slide_tile.shape
    if not x == tumor_tile.shape:
        tumor_tile = tumor_tile[:x[0],:x[1],:]
    if not mark_tile.shape == x:
        mark_tile = mark_tile[:x[0],:x[1],:]
    if not arti_tile.shape == x:
        arti_tile = arti_tile[:x[0],:x[1],:]
    tile = np.multiply(np.multiply(slide_tile,mark_tile),arti_tile)
    #if tile[np.where(tile==np.array([0,0,0]))].shape!=(0,):
        #tile[np.where(tile==np.array([0,0,0]))]= fill
    tile[np.where(tile==np.array([0,0,0]))] = fill
    tile_masked= np.multiply(tile,tumor_tile)
    tile = Image.fromarray(np.uint8(tile))
    #assert tile.size==(512,512),f"wrong shape:{tile.size}"
    return tile,tile_masked


def filtered(tile):
    tolerance = np.array([230,230,230])
    #tile_1 = tile.copy()
    tile[np.where((tile > tolerance).sum(axis=2) == 3)]=0
    percent = ((tile != np.array([0,0,0])).astype(float).sum(axis=2)!=0).sum()/(tile.shape[0]**2)
    return percent

def extract_patches(levels):
    for level in levels:
        print(f'processing ---level {level}')
        tiledir = Path(tile_path)/str(level)
        if not Path(tiledir).exists():
            os.makedirs(tiledir)
            assert slide_tiles.level_tiles[level] == mark_tiles.level_tiles[level]
        cols,rows = slide_tiles.level_tiles[level]
        for row in range(rows):
            for col in range(cols):
                tilename = os.path.join(tiledir,'%d_%d.%s'%(col,row,"tiff"))
                if not Path(tilename).exists():
                    slide_tile = slide_tiles.get_tile(level,(col,row))
                    #if slide_tile.size == (512,512):
                    tumor_tile = tumor_tiles.get_tile(level,(col,row))
                    mark_tile = mark_tiles.get_tile(level,(col,row))
                    arti_tile = arti_tiles.get_tile(level,(col,row))
                    tile,tile_masked = remove_arti_and_mask(slide_tile,tumor_tile,mark_tile,arti_tile)

                   # tile_masked = np.multiply(slide_tile,mark_tile)
                    percent = filtered(tile_masked)
                    if percent >= 0.5:
                        tile.save(tilename)
                    else:
                        pass
                
        print("Done!")
    print("All levels processed!!")

# 全局变量


In [3]:
tile_size=512
overlap=0
limit_bounds = True
rule = {"tumors":{"excludes":["blood","artifact","mask"]}}  #需要进行自定义


# 运行

## 切割单张WSI
- 更改slide_path
- 更改patch_path


In [4]:
slide_path = r"F:\lab\data\bladder cancer\TCGA_KIRC\TCGA-AK-3425\TCGA-AK-3425-01Z-00-DX1.39FD616C-3050-41E1-BF80-D2069E492FDE.svs"
xml_path = str(Path(slide_path).with_suffix(".xml"))
patch_path = r"F:\test"
levels=[14]

In [5]:
case_name = Path(slide_path).parent.name
case_path = Path(patch_path)/Path(slide_path).parts[2]/case_name  # 把2改成4
tile_path = Path(case_path)/Path(slide_path).stem.split(".")[0]
slide = get_slide(slide_path)   #返回 openslide.Openslide
masks = Annotation(slide,path=str(xml_path))
print(f"masks groups includes :{list(masks.keys())}")
tumor_slide,mark_slide,arti_slide = get_mask_slide(masks)
slide_tiles,tumor_tiles,mark_tiles,arti_tiles = get_tiles(slide,tumor_slide,mark_slide,arti_slide,tile_size=512,overlap=False,limit_bounds=False)
fill = int(np.array(slide_tiles.get_tile(16,(0,0))).mean())
print(f"fill_blank_value:{fill}")
extract_patches(levels)

  contours = np.array(mask_coords[cls])


masks groups includes :['tumors', 'blood', 'artifact', 'mark']
fill_blank_value:243
processing ---level 14
Done!
All levels processed!!


## 切割完整全部WSI
- **更改levels，确保levels包含13，14，15，16** 
- 更改slide_source为存放TCGA_cases的位置
- 更改patch_path

In [None]:
levels = [13,14,15,16]
slide_source = r"D:\cases"
patch_path = r"E:\TCGA_patch"
svs_paths = list(Path(slide_source).rglob("*.svs"))


In [None]:
extracted_case = []
un_extracted_case = []
for i,svs in enumerate(svs_paths):
    start = time.time()
    totol_num = len(svs_paths)
    print(f"processing  {i+1}/{totol_num}:------{svs}")
    xml_path = str(Path(svs).with_suffix(".xml"))
    case_name = Path(svs).parent.name
    case_path = Path(patch_path)/Path(svs).parts[2]/case_name
    tile_path = Path(case_path)/Path(svs).stem.split(".")[0]
    slide = get_slide(str(svs))
    masks = Annotation(slide,path=str(xml_path))
    print(f"masks groups includes :{list(masks.keys())}")
    tumor_slide,mark_slide,arti_slide = get_mask_slide(masks)
    slide_tiles,tumor_tiles,mark_tiles,arti_tiles = get_tiles(slide,tumor_slide,mark_slide,arti_slide,tile_size=512,overlap=False,limit_bounds=False)
    fill = int(np.array(slide_tiles.get_tile(16,(0,0))).mean())
    print(f"fill_blank_value:{fill}")
    del slide
    gc.collect()
    try:
        extract_patches(levels)
        extracted_case.append(svs)
    except:
        un_extracted_case.append(svs)
        continue
    end = time.time()
    print(f"Time consumed : {(end-start)/60} min")