reference :

https://www.kaggle.com/code/ammarnassanalhajali/uwmgi-unet-keras-inference/notebook


kaggle link

https://www.kaggle.com/code/ismailazdad/keras-coco-inference2

In [1]:
! ls ../input/pycocotools

In [2]:
!pip install - q pycocotools - -no-index - -find-links = file: // /kaggle/input/pycocotools/

In [3]:
from keras.models import load_model
from keras.callbacks import Callback, ModelCheckpoint
from keras.losses import binary_crossentropy
from keras.layers.merge import concatenate
from keras.layers.pooling import MaxPooling2D
from tensorflow.keras.metrics import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import *
from tensorflow.keras import *
from tensorflow import keras
import tensorflow as tf
from PIL import Image
from pathlib import Path
from pycocotools.coco import COCO
import matplotlib as mpl
import matplotlib.patches as mpatches
import matplotlib.gridspec as gridspec
import seaborn as sns
import matplotlib.pyplot as plt
from glob import glob
from sklearn.model_selection import train_test_split
import pandas as pd
import itertools
from itertools import groupby
import random
import cv2
from tqdm.notebook import tqdm
from tqdm import tqdm
import os
from skimage import measure
import pycocotools.mask as mask_util
import numpy as np
import json
from pycocotools import mask
import pycocotools
import warnings
warnings.filterwarnings("ignore")
# from google.colab.patches import cv2_imshow

In [4]:
from keras import backend as K
from keras.losses import binary_crossentropy
import tensorflow as tf


def dice_coef(y_true, y_pred, smooth=1):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)


def iou_coef(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1, 2, 3])
    union = K.sum(y_true, [1, 2, 3])+K.sum(y_pred, [1, 2, 3])-intersection
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou


def dice_loss(y_true, y_pred):
    smooth = 1.
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = y_true_f * y_pred_f
    score = (2. * K.sum(intersection) + smooth) / \
        (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return 1. - score


def bce_dice_loss(y_true, y_pred):
    return binary_crossentropy(tf.cast(y_true, tf.float32), y_pred) + 0.5 * dice_loss(tf.cast(y_true, tf.float32), y_pred)

In [5]:
class FixedDropout(keras.layers.Dropout):
    def _get_noise_shape(self, inputs):
        if self.noise_shape is None:
            return self.noise_shape

        symbolic_shape = K.shape(inputs)
        noise_shape = [symbolic_shape[axis] if shape is None else shape
                       for axis, shape in enumerate(self.noise_shape)]
        return tuple(noise_shape)


custom_objects = custom_objects = {
    'FixedDropout': FixedDropout,
    'dice_coef': dice_coef,
    'iou_coef': iou_coef,
    'bce_dice_loss': bce_dice_loss
}

model = load_model('../input/keras-coco-training/model.h5',
                   custom_objects=custom_objects)

In [6]:
test_path_json = '../input/keras-coco-training/test_json.json'

In [7]:
coco = COCO(test_path_json)
catIDs = coco.getCatIds()
imgIds = coco.getImgIds()
imgDict = coco.loadImgs(imgIds)
print(len(imgIds), len(catIDs))

In [8]:
def filterDataset(classes=None, json_file=None):
    # initialize COCO api for instance annotations
    annFile = json_file
    coco = COCO(annFile)

    images = []
    if classes != None:
        # iterate for each individual class in the list
        for className in classes:
            # get all images containing given categories
            catIds = coco.getCatIds(catNms=className)
            imgIds = coco.getImgIds(catIds=catIds)
            images += coco.loadImgs(imgIds)

    else:
        imgIds = coco.getImgIds()
        images = coco.loadImgs(imgIds)

    # Now, filter out the repeated images
    unique_images = []
    for i in range(len(images)):
        if images[i] not in unique_images:
            unique_images.append(images[i])

    random.shuffle(unique_images)
    dataset_size = len(unique_images)

    return unique_images, dataset_size, coco

In [9]:
# use image size 128 for better results
image_size = 128
epochs = 10
batch_size = 8
input_image_size = (128, 128)
classes = ['small_bowel', 'large_bowel', 'stomach']

In [10]:
images_test, dataset_size_test, coco_val = filterDataset(
    classes,  test_path_json)

In [11]:
class DataGeneratorFromCocoJson(tf.keras.utils.Sequence):
  # function getting info dataset from json coco
  # Batch size
  # subset train or test for annotations
  # image_list to develop...
  # classes classe wanted
  # input image size tuple (X,X)
  # annFile path to annoted coco json file file
    def __init__(self, batch_size=batch_size, subset="train", image_list=[], classes=[], input_image_size=(128, 128), annFile='', shuffle=False):

        super().__init__()
        self.subset = subset
        self.batch_size = batch_size
        self.indexes = np.arange(len(image_list))
        self.image_list = image_list
        self.classes = classes
        self.input_image_size = (input_image_size)
        self.dataset_size = len(image_list)
        self.coco = COCO(annFile)
        catIds = self.coco.getCatIds(catNms=self.classes)
        self.catIds = catIds
        self.cats = self.coco.loadCats(catIDs)
        self.imgIds = self.coco.getImgIds()
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(len(self.image_list)/self.batch_size)

    def on_epoch_end(self):
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def getClassName(self, classID, cats):
        for i in range(len(cats)):
            if cats[i]['id'] == classID:
                return cats[i]['name']
        return None

    def getNormalMask(self, image_id, catIds):
        annIds = self.coco.getAnnIds(image_id, catIds=catIds, iscrowd=None)
        anns = self.coco.loadAnns(annIds)
        cats = self.coco.loadCats(catIds)
        train_mask = np.zeros(self.input_image_size, dtype=np.uint8)
        for a in range(len(anns)):
            className = self.getClassName(anns[a]['category_id'], cats)
            pixel_value = self.classes.index(className)+1
            new_mask = cv2.resize(self.coco.annToMask(
                anns[a])*pixel_value, self.input_image_size)
            train_mask = np.maximum(new_mask, train_mask)
            # train_mask = new_mask / 255.0
        return train_mask

    def getLevelsMask(self, image_id):
        # for each category , we get the x mask and add it to mask list
        res = []
        mask = np.zeros((self.input_image_size))
        for j, categorie in enumerate(self.catIds):
            annIds = coco.getAnnIds(image_id, catIds=categorie, iscrowd=None)
            anns = coco.loadAnns(annIds)
            mask = self.getNormalMask(image_id, categorie)
            res.append(mask)
        return res

    def getImage(self, file_path):
        train_img = cv2.imread(file_path, cv2.IMREAD_ANYDEPTH)
        train_img = cv2.resize(train_img, (self.input_image_size))
        train_img = train_img.astype(np.float32) / 255.
        if (len(train_img.shape) == 3 and train_img.shape[2] == 3):
            return train_img
        else:
            stacked_img = np.stack((train_img,)*3, axis=-1)
            return stacked_img

    def get_image_Infos_by_path_id(self, node):
        for dict in self.image_list:
            if dict['file_name'] == node:
                return dict

    def __getitem__(self, index):
        X = np.empty((self.batch_size, 128, 128, 3))
        y = np.empty((self.batch_size, 128, 128, 3))
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        for i in range(len(indexes)):
            value = indexes[i]
            img_info = self.image_list[value]
            w = img_info['height']
            h = img_info['width']
            X[i, ] = self.getImage(img_info['file_name'])
            mask_train = self.getLevelsMask(img_info['id'])
            for j in self.catIds:
                y[i, :, :, j] = mask_train[j]
                y[i, :, :, j] = mask_train[j]
                y[i, :, :, j] = mask_train[j]

        X = np.array(X)
        y = np.array(y)

        if self.subset == 'train':
            return X, y
        else:
            return X

In [12]:
test_generator_class = DataGeneratorFromCocoJson(
    batch_size, "train", images_test, classes, input_image_size, test_path_json, shuffle=False)

In [13]:
img_s, mask_s = test_generator_class.__getitem__(2)
preds = model.predict(img_s)


fig = plt.figure(figsize=(10, 25))
gs = gridspec.GridSpec(nrows=len(img_s), ncols=3)
colors = ['yellow', 'green', 'red']
labels = ["Small Bowel", "Large Bowel", "Stomach"]
patches = [mpatches.Patch(
    color=colors[i], label=f"{labels[i]}") for i in range(len(labels))]

cmap1 = mpl.colors.ListedColormap(colors[0])
cmap2 = mpl.colors.ListedColormap(colors[1])
cmap3 = mpl.colors.ListedColormap(colors[2])
flag = False
for i in range(0, 7):

    images, mask = img_s[i], mask_s[i]
    sample_img = images/255.
    mask1 = mask[:, :, 0]
    mask2 = mask[:, :, 1]
    mask3 = mask[:, :, 2]

    pre = preds[i]
    predict1 = pre[:, :, 0]
    predict1 = (predict1 > 0.8).astype(np.float32)
    predict1 = np.array(predict1)
    predict2 = pre[:, :, 1]
    predict3 = pre[:, :, 2]

    ax0 = fig.add_subplot(gs[i, 0])
    im = ax0.imshow(sample_img[:, :, 0], cmap='gray')

    ax1 = fig.add_subplot(gs[i, 1])
    ax2 = fig.add_subplot(gs[i, 2])
    if(flag == False):
        flag = True
        ax0.set_title("Image", fontsize=15, weight='bold', y=1.02)
        ax1.set_title("Mask", fontsize=15, weight='bold', y=1.02)
        ax2.set_title("predicted Mask", fontsize=15, weight='bold', y=1.02)
        plt.legend(handles=patches, bbox_to_anchor=(1.1, 0.65), loc=2, borderaxespad=0.4, fontsize=14,
                   title='Mask Labels', title_fontsize=14, edgecolor="black",  facecolor='#c5c6c7')

    l0 = ax1.imshow(sample_img[:, :, 0], cmap='gray')
    l1 = ax1.imshow(np.ma.masked_where(
        mask1 == False,  mask1), cmap=cmap1, alpha=1)
    l2 = ax1.imshow(np.ma.masked_where(
        mask2 == False,  mask2), cmap=cmap2, alpha=1)
    l3 = ax1.imshow(np.ma.masked_where(
        mask3 == False,  mask3), cmap=cmap3, alpha=1)

    l0 = ax2.imshow(sample_img[:, :, 0], cmap='gray')
    l1 = ax2.imshow(np.ma.masked_where(
        predict1 == False,  predict1), cmap=cmap1, alpha=1)
    l2 = ax2.imshow(np.ma.masked_where(
        predict2 == False,  predict2), cmap=cmap2, alpha=1)
    l3 = ax2.imshow(np.ma.masked_where(
        predict3 == False,  predict3), cmap=cmap3, alpha=1)
    _ = [ax.set_axis_off() for ax in [ax0, ax1]]

    colors = [im.cmap(im.norm(1)) for im in [l1, l2, l3]]

In [14]:
# Run-length encoding
def rle_encode(img):
    '''
    img: numpy array, 1 - mask, 0 - background
    Returns run length as string formated
    '''
    pixels = img.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]

    return ' '.join(str(x) for x in runs)

In [15]:
submission_df = pd.DataFrame.from_records(images_test)
submission_df.insert(5, 'prediction', np.nan)
submission_df.insert(3, 'class_name', np.nan)
submission_df.head()

In [16]:
import gc
test_generator_class = DataGeneratorFromCocoJson(
    1, "test", images_test, classes, input_image_size, test_path_json, shuffle=True)
gc.collect()
LOGITS = model.predict(test_generator_class, verbose=1)
gc.collect()

In [17]:
lbs = []
sbs = []
sts = []
for index, row in tqdm(submission_df.iterrows(), total=submission_df.shape[0]):
    root_shape = (submission_df.iloc[index]["height"],
                  submission_df.iloc[index]["width"])
    pred_arr = np.round(cv2.resize(
        LOGITS[index, :, :, 0], root_shape, interpolation=cv2.INTER_NEAREST)).astype('uint8')
    sbs.append(rle_encode(pred_arr))
    pred_arr = np.round(cv2.resize(
        LOGITS[index, :, :, 1], root_shape, interpolation=cv2.INTER_NEAREST)).astype('uint8')
    lbs.append(rle_encode(pred_arr))
    pred_arr = np.round(cv2.resize(
        LOGITS[index, :, :, 2], root_shape, interpolation=cv2.INTER_NEAREST)).astype('uint8')
    sts.append(rle_encode(pred_arr))
del LOGITS
gc.collect()

In [18]:
ids = []
classes = []
rles = []
for index, row in tqdm(submission_df.iterrows(), total=submission_df.shape[0]):
    ids.extend([row['file_id']] * 3)
    classes.extend(['small_bowel', 'large_bowel', 'stomach'])
    rles.extend([sbs[index], lbs[index], sts[index]])

In [19]:
submission_df = pd.DataFrame()
submission_df['id'] = ids
submission_df['class'] = classes
submission_df['predicted'] = rles
# submission_df = submission_df.reset_index(drop=True)
submission_df.to_csv("submission.csv", index=False)

In [20]:
submission_df.sample(10)