In [10]:
import tqdm
import json
import matplotlib.pyplot as plt
import numpy as np
from skimage.draw import polygon
import skimage.io
from pathlib import Path
from loguru import logger
import io3d


In [34]:
dataset_root = Path(r"H:\biomedical\orig\pilsen_pigs_all")

# Get shapes and voxelsizes in mhd files

In [66]:
dataset_mhd_path = list(dataset_root.glob("transplantation_raw/**/*.mhd"))
len(dataset_mhd_path)

dataset_metadata = {}
for fn in tqdm.tqdm(dataset_mhd_path):
    datap = io3d.read(fn)
    dataset_metadata[fn.stem] = dict(filename=fn.name, shape= datap.data3d.shape, voxelsize_mm= datap.voxelsize_mm)
    

with open(dataset_root / "dataset_metadata.json", "w") as f:
    json.dump(dataset_metadata, f)
    
dataset_metadata

100%|██████████| 56/56 [01:01<00:00,  1.10s/it]


{'Tx017D_Art': {'filename': 'Tx017D_Art.mhd',
  'shape': (1224, 512, 512),
  'voxelsize_mm': [0.7999999999999545, 0.49391796875, 0.49391796875]},
 'Tx017D_Ven': {'filename': 'Tx017D_Ven.mhd',
  'shape': (213, 512, 512),
  'voxelsize_mm': [0.8000000000001819, 0.61840625, 0.61840625]},
 'Tx018D_Art': {'filename': 'Tx018D_Art.mhd',
  'shape': (284, 512, 512),
  'voxelsize_mm': [0.7999999999999545, 0.65259765625, 0.65259765625]},
 'Tx018D_Ven': {'filename': 'Tx018D_Ven.mhd',
  'shape': (882, 512, 512),
  'voxelsize_mm': [0.7999999999999545, 0.65626171875, 0.65626171875]},
 'Tx019D_Ven': {'filename': 'Tx019D_Ven.mhd',
  'shape': (872, 512, 512),
  'voxelsize_mm': [0.7999999999999545, 0.630796875, 0.630796875]},
 'Tx021D_Ven': {'filename': 'Tx021D_Ven.mhd',
  'shape': (1018, 512, 512),
  'voxelsize_mm': [0.8000000000001819, 0.6418828125, 0.6418828125]},
 'Tx022D_Ven': {'filename': 'Tx022D_Ven.mhd',
  'shape': (969, 512, 512),
  'voxelsize_mm': [0.7999999999999545, 0.642640625, 0.642640625]},

In [67]:
class AnnotationTOmask:
    def __init__(self, annotation_file):
        # load dataset
        print("loading annotations ...")
        dataset = json.load(open(annotation_file, "r"))
        print("annotations loaded!")

        # creating index
        print("creating index...")
        imgToAnns = {ann["image_id"]: [] for ann in dataset["annotations"]}
        anns = {ann["id"]: [] for ann in dataset["annotations"]}
        for ann in dataset["annotations"]:
            imgToAnns[ann["image_id"]] += [ann]
            anns[ann["id"]] = ann

        imgs = {im["id"]: {} for im in dataset["images"]}
        for img in dataset["images"]:
            imgs[img["id"]] = img

        cats = []
        catToImgs = []
        cats = {cat["id"]: [] for cat in dataset["categories"]}
        for cat in dataset["categories"]:
            cats[cat["id"]] = cat
        catToImgs = {cat["id"]: [] for cat in dataset["categories"]}
        for ann in dataset["annotations"]:
            catToImgs[ann["category_id"]] += [ann["image_id"]]

        print(f"index created! {len(imgs)} images, {len(anns)} annotations, {len(cats)} categories")

        # create class members
        self.anns = anns
        self.imgToAnns = imgToAnns
        self.catToImgs = catToImgs
        self.imgs = imgs
        self.cats = cats
        self.dataset = dataset

    def getAnnIds(self, imgIds, catIds):
        """
        Get ann ids for given cats from all images
        """
        imgIds = imgIds if type(imgIds) == list else [imgIds]
        catIds = catIds if type(catIds) == list else [catIds]

        if not len(imgIds) == 0:
            anns = sum(
                [self.imgToAnns[imgId] for imgId in imgIds if imgId in self.imgToAnns],
                [],
            )
        else:
            anns = self.dataset["annotations"]
        anns = (
            anns
            if len(catIds) == 0
            else [ann for ann in anns if ann["category_id"] in catIds]
        )
        ids = [ann["id"] for ann in anns]

        return ids

    def getCatIds(self, catNms=[], catIds=[]):
        """
        get integer array of cat ids for given cat names, given cat ids
        """
        if len(catNms) == len(catIds) == 0:
            cats = self.dataset["categories"]
        else:
            # print("+++++++++++++++++++++")
            cats = self.dataset["categories"]
            cats = (
                cats
                if len(catNms) == 0
                else [cat for cat in cats if cat["name"] in catNms]
            )
            cats = (
                cats
                if len(catIds) == 0
                else [cat for cat in cats if cat["id"] in catIds]
            )
        ids = [cat["id"] for cat in cats]
        return ids

    def getImgIds(self, imgIds=[], catIds=[]):
        """
        return an integer array of img ids for given ids with all given cats
        """
        ids = set(imgIds)
        for catId in catIds:
            if len(ids) == 0:
                ids = set(self.catToImgs[catId])
            else:
                ids &= set(self.catToImgs[catId])
        return list(ids)

    def loadAnns(self, ids=[]):
        """
        loaded ann objects for integer ids specifying annotations
        """
        if type(ids) == list:
            return [self.anns[id] for id in ids]
        elif type(ids) == int:
            return [self.anns[ids]]

    def loadImgs(self, ids):
        """
        Load loaded img objects with the specified ids.
        """
        return [self.imgs[ids]]

    def getSeg(self, anns):
        """
        get annotations segmentatins
        """
        if len(anns) == 0:
            print("no annotations found")
            return 0

        S = []
        for ann in anns:
            for seg in ann["segmentation"]:
                S.append(seg)
        return S

    def segToMask(self, S, h, w):
        """
        Convert polygon segmentation to binary mask.
          S: polygon segmentation mask, h: target mask height, w: target mask width
        """
        M = np.zeros((h, w))
        for s in S:
            N = len(s)
            rr, cc = polygon(np.array(s[1:N:2]), np.array(s[0:N:2]))  # (y, x)
            M[rr, cc] = 1
        return M


def coco_to_mask(
    coco_filename,
    output_dir,
    organ,
    voxelsize_mm=None,
    output_type="png",
    show=False,
    name_prefix="",
):
    """
    :param coco_filename: coco_filename must include not only name of Coco file, but also it`s full direction (location) in your PC
    :param output_dir: is used for controll output direction (location) in your PC
    :param organ: name of segmentation part rom what we want have mask ""
    :param voxelsize_mm:
    :param output_type: type of "Save fail" of our program| results]
    :param show:
    :return:
    """
    # file_path = 'task_cell track 20200226-dii-30las-2pre1-2020_10_26_13_28_36-coco 1.0/annotations/instances_default.json'

    file_path = coco_filename
    import datetime

    t0 = datetime.datetime.now()
    cv_an = AnnotationTOmask(file_path)
    t1 = datetime.datetime.now()
    cv_an1 = cv_an.getImgIds()
    t2 = datetime.datetime.now()
    logger.debug(f"{t1 - t0}, {t2 - t1}")
    logger.debug("getImgIds", cv_an1)
    # extract the region we want to mask ( Right Kidny,  Liver)

    # catIds = cv_an.getCatIds(catNms=['cell'])
    catIds = cv_an.getCatIds(catNms=[organ])
    # catIds = cv_an.getCatIds(catNms=['Liver'])
    # catIds = cv_an.getCatIds(catNms=['Left Kidney'])

    # logger.debug("catIds", catIds)
    imgIds = cv_an.getImgIds(catIds=catIds)
    if len(imgIds) == 0:
        logger.warning(f"Label '{organ}' not found.")
    # logger.debug(f"imgIds={imgIds}")
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    # TODO allow storing into one file
    #  imgName = cv_an.dataset['images'][0]
    #  M = np.zeros([imgName['width'], imgName['height'],len(cv_an["images"])])
    #  ...

    # create empty files
    for imgName in cv_an.dataset["images"]:
        # logger.debug(f"imgName={imgName}")
        M = np.zeros([imgName["width"], imgName["height"]])
        image_path = Path(output_dir) / organ / (
            name_prefix + 
            # Path(imgName["file_name"]).name + "." + output_type
            imgName["file_name"] + "." + output_type
        )
        image_path.parent.mkdir(parents=True, exist_ok=True)
        # logger.debug(f"image_path={image_path}")
        skimage.io.imsave(image_path, np.uint8(M), check_contrast=False)
        # plt.imsave(image_path, np.uint8(M), cmap = 'gray')

    # rewrite images with label
    for im in tqdm.tqdm(imgIds):
        # logger.debug(f"iM{im}")
        imgName = cv_an.loadImgs(im)[0]
        anns_ids = cv_an.getAnnIds(imgIds=imgName["id"], catIds=catIds)
        anns = cv_an.loadAnns(anns_ids)
        S = cv_an.getSeg(anns)
        M = cv_an.segToMask(S, imgName["width"], imgName["height"]) * 255
        if show:
            plt.figure()
            plt.imshow(M)
            plt.show()
            plt.close()
        # image_path = Path(output_dir) / ('MaskOfPIG_'+str(imgName['file_name'])+'.'+output_type)
        image_path = Path(output_dir) / organ / (
                name_prefix +
                # Path(imgName["file_name"]).name + "." + output_type
                imgName["file_name"] + "." + output_type
        )
        # logger.debug(image_path)
        un = np.unique(M)
        if len(un) < 2:
            logger.warning(f"No data found in annotation {imgName['file_name']}")
        skimage.io.imsave(image_path, np.uint8(M), check_contrast=False)
        plt.imsave(image_path, np.uint8(M), cmap="gray")

    logger.debug("Done")

In [68]:
# json_path = Path(r"H:\biomedical\orig\pilsen_pigs_all\transplantation_annotation\annotations_\pilsen_pigs_coco_from_cvat/instances_default.json")
# # json_path = Path(r"H:\biomedical\orig\pilsen_pigs_all\transplantation_annotation\annotations_\tx028_coco\annotations\instances_default.json")
# assert json_path.exists()

In [69]:

# file_path = json_path
# organ="Left Kidney"
# 
# 
# import datetime
# 
# t0 = datetime.datetime.now()
# cv_an = AnnotationTOmask(file_path)
# t1 = datetime.datetime.now()
# cv_an1 = cv_an.getImgIds()
# t2 = datetime.datetime.now()
# logger.debug(f"{t1 - t0}, {t2 - t1}")
# logger.debug("getImgIds", cv_an1)
# # extract the region we want to mask ( Right Kidny,  Liver)
# 
# # catIds = cv_an.getCatIds(catNms=['cell'])
# catIds = cv_an.getCatIds(catNms=[organ])
# # catIds = cv_an.getCatIds(catNms=['Liver'])
# # catIds = cv_an.getCatIds(catNms=['Left Kidney'])
# 
# logger.debug("catIds", catIds)
# imgIds = cv_an.getImgIds(catIds=catIds)
# if len(imgIds) == 0:
#     logger.warning(f"Label '{organ}' not found.")


In [70]:

coco_path= Path(r"H:\biomedical\orig\pilsen_pigs_all\transplantation_annotation\annotations_\pilsen_pigs_coco_from_cvat/instances_default.json")
output_path_png= Path(r"H:\biomedical\orig\pilsen_pigs_all\transplantation_annotation\masks_from_annotations\2024-07_png")
output_path_mhd = Path(r"H:\biomedical\orig\pilsen_pigs_all\transplantation_annotation\masks_from_annotations\2024-07_mhd")


In [71]:
cv_an = AnnotationTOmask(coco_path)

loading annotations ...
annotations loaded!
creating index...
index created! 17016 images, 6396 annotations, 9 categories


In [72]:
# len(cv_an.dataset["images"])
cv_an.dataset["images"][0]


{'id': 1,
 'width': 512,
 'height': 512,
 'file_name': 'Tx040D_Ven/Tx040D_Vensoubor_01123.jpg',
 'license': 0,
 'flickr_url': '',
 'coco_url': '',
 'date_captured': 0}

In [73]:
cv_an.dataset["categories"]

[{'id': 1, 'name': 'Heart', 'supercategory': ''},
 {'id': 2, 'name': 'Thoracic Cavity', 'supercategory': ''},
 {'id': 3, 'name': 'Left Kidney', 'supercategory': ''},
 {'id': 4, 'name': 'Right Kidney', 'supercategory': ''},
 {'id': 5, 'name': 'Liver', 'supercategory': ''},
 {'id': 6, 'name': 'Portal Vein', 'supercategory': ''},
 {'id': 7, 'name': 'Vena Cava', 'supercategory': ''},
 {'id': 8, 'name': 'Aorta', 'supercategory': ''},
 {'id': 9, 'name': 'Portal Vein Entry Point', 'supercategory': ''}]

In [74]:

class SaveMasksFromCVAT:
    
    def __init__(self, coco_path, output_path_mhd, dataset_metadata):
        self.coco_path = coco_path
        self.output_path_mhd = output_path_mhd
        self.dataset_metadata = dataset_metadata
        cv_an = AnnotationTOmask(coco_path)
        # sort images by file_name
        cv_an.dataset["images"] = sorted(cv_an.dataset["images"], key=lambda x: x["file_name"])

        dataset_metadata_cvat = {}
        for i, img in enumerate(cv_an.dataset["images"]):
            ct_name = Path(img["file_name"]).parent.name
            if ct_name not in dataset_metadata_cvat:
                dataset_metadata_cvat[ct_name] = dict(
                    width=img["width"],
                    height=img["height"],
                    count=1,
                )
            else:
                dataset_metadata_cvat[ct_name]["count"] += 1
                
        self.cv_an = cv_an
        self.dataset_metadata_cvat = dataset_metadata_cvat
    
        # with open(dataset_root / "dataset_metadata_cvat.json", "w") as f:
        #     json.dump(dataset_metadata_cvat, f)
    
    def get_list_of_organs(self):
        return [element["name"] for element in self.cv_an.dataset["categories"]]

    def create_empty_mask_and_name(self, imgId:int):
        dataset_metadata = self.dataset_metadata
        img_record = cv_an.loadImgs(imgId)[0]
        ct_name = Path(img_record["file_name"]).parent.name
        if ct_name in dataset_metadata:
            shape = dataset_metadata[ct_name]["shape"]
            voxelsize_mm = dataset_metadata[ct_name]["voxelsize_mm"]
        else:
            logger.warning(f"CT {ct_name} not found in dataset_metadata, voxelsize set to [1., 1., 1.]")
            shape = [self.dataset_metadata_cvat[ct_name]["count"], self.dataset_metadata_cvat[ct_name]["width"], self.dataset_metadata_cvat[ct_name]["height"]]
            voxelsize_mm = [1., 1., 1.]

        mask3d = np.zeros(shape, dtype=np.uint8)


        return mask3d, ct_name, voxelsize_mm
    
    def save_masks_for_organ(self, organ:str):
        cv_an = self.cv_an
        # organ = "Left Kidney"
        catIds = cv_an.getCatIds(catNms=[organ])
        # catIds = cv_an.getCatIds(catNms=['Liver'])
        # catIds = cv_an.getCatIds(catNms=['Left Kidney'])

        # logger.debug("catIds", catIds)
        imgIds = cv_an.getImgIds(catIds=catIds)
        imgIds = sorted(imgIds, key=lambda x: cv_an.loadImgs(x)[0]["file_name"])


        if len(imgIds) == 0:
            logger.warning(f"Label '{organ}' not found.")
            return
        imgId = imgIds[0]
        mask3d, ct_name, voxelsize_mm = self.create_empty_mask_and_name(imgId)
        # n_processed = 0

        for im in tqdm.tqdm(imgIds):
            # logger.debug(f"iM{im}")
            img_record = cv_an.loadImgs(im)[0]
            ct_name_i = Path(img_record["file_name"]).parent.name
            if ct_name_i != ct_name:
                # save mask3d and prepare new one
                jpg_path = output_path_mhd / ct_name / (ct_name + "_" + organ.lower().replace(" ", "_") +  ".jpg")
                mhd_path = output_path_mhd / ct_name / (ct_name + "_" + organ.lower().replace(" ", "_") +  ".mhd")
                mhd_path.parent.mkdir(parents=True, exist_ok=True)
                io3d.write(mask3d, mhd_path, metadata={"voxelsize_mm": voxelsize_mm})
                # save projection to jpg
                projection = mask3d.astype(float).sum(axis=0)
                if projection.max() > 0:
                    projection = (projection / projection.max())
                    # logger.info(f"{ct_name} {organ} is ok")
                
                projection = (projection * 255.).astype(np.uint8)
                skimage.io.imsave(jpg_path, projection, check_contrast=False)
                mask3d, ct_name, voxelsize_mm = self.create_empty_mask_and_name(im)


            anns_ids = cv_an.getAnnIds(imgIds=img_record["id"], catIds=catIds)
            anns = cv_an.loadAnns(anns_ids)
            S = cv_an.getSeg(anns)
            mask2d = cv_an.segToMask(S, img_record["width"], img_record["height"]) * 255
            mask2d = mask2d.astype(np.uint8)

            filestem = (Path(img_record["file_name"]).stem)
            # get last number from filename. filename is i.e. Tx028D_Vensoubor_00566
            slice_id = int(filestem.split("_")[-1])
            mask3d[slice_id] = mask2d
            # if np.max(mask2d)> 0:
            #     n_processed += 1

    def save_masks(self, organs=None):
        if organs is None:
            organs = self.get_list_of_organs()
            
        for organ in organs:
            self.save_masks_for_organ(organ)

In [75]:
svmasks = SaveMasksFromCVAT(coco_path, output_path_mhd, dataset_metadata)
svmasks.save_masks()

loading annotations ...
annotations loaded!
creating index...
index created! 17016 images, 6396 annotations, 9 categories


100%|██████████| 983/983 [00:14<00:00, 65.71it/s] 
100%|██████████| 306/306 [00:05<00:00, 53.39it/s] 
100%|██████████| 1010/1010 [00:11<00:00, 85.17it/s]
100%|██████████| 1006/1006 [00:11<00:00, 83.94it/s]
100%|██████████| 1673/1673 [00:22<00:00, 73.51it/s] 
100%|██████████| 107/107 [00:00<00:00, 732.88it/s]
100%|██████████| 366/366 [00:00<00:00, 750.00it/s]
