In [2]:
!pip install tqdm
!pip install segmentation_models_pytorch
!pip install albumentations
!pip install plantcv

Collecting opencv-python
  Downloading opencv_python-4.5.4.58-cp37-cp37m-manylinux2014_x86_64.whl (60.3 MB)
[K     |████████████████████████████████| 60.3 MB 63 kB/s 
Installing collected packages: opencv-python
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
plantcv 3.13.2 requires opencv-python<4,>=3.4, but you have opencv-python 4.5.4.58 which is incompatible.[0m
Successfully installed opencv-python-4.5.4.58


Collecting opencv-python<4,>=3.4
  Using cached opencv_python-3.4.16.57-cp37-cp37m-manylinux2014_x86_64.whl (58.0 MB)
Installing collected packages: opencv-python
  Attempting uninstall: opencv-python
    Found existing installation: opencv-python 4.5.4.58
    Uninstalling opencv-python-4.5.4.58:
      Successfully uninstalled opencv-python-4.5.4.58
Successfully installed opencv-python-3.4.16.57


In [3]:
import os
import glob
import torch
import cv2 as cv
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.misc import electrocardiogram
from torch.utils.tensorboard import SummaryWriter

from torch.utils.data import Dataset
from plantcv import plantcv as pcv
from skimage import img_as_float, morphology
from skimage.color import gray2rgb

In [4]:
cd drive/Shareddrives/KOHI_의료영상1팀/

/content/drive/Shareddrives/KOHI_의료영상1팀


In [5]:
evaluation = 'CheXpert'

In [6]:
len(os.listdir(f'External Validation/{evaluation}/dehazing'))

220

In [7]:
image_dir = f'External Validation/{evaluation}/dehazing'

## Load Image

load image를 할때 cv.imread로 미리 읽어오면 메모리가 부족할 수도 있는 현상이 발생할 수도 있으므로,

imread는 dataloader의 __getitem__에서 해결

In [8]:
input_list = os.listdir(image_dir)

In [9]:
len(input_list)

220

In [10]:
dataset = []


for input in tqdm(input_list):
  image = os.path.join(image_dir, input)

  dataset.append({'name': input[:-4], 'image_path':image})

100%|██████████| 220/220 [00:00<00:00, 131896.35it/s]


### split train/test data

In [11]:
data_test = dataset
print(f'test dataset 개수: {len(data_test)}')

lr = 1e-5
batch_size=8
train_continue= False
augment=False

optim_mode = 'adam'

model_sort = 'efficient'

# ckpt_dir=f'이승아/checkpoint/U-Net/Dice/1e-05(padding)'
# result_dir =f'이승아/result/U-Net/Dice/1e-05(padding)'

ckpt_dir=f'이승아/checkpoint/EfficientNet/V2'
result_dir =f'Evaluate result/{evaluation}/mask'

image_size = 256


test dataset 개수: 220


## Augmentation

In [12]:
import albumentations as A
from torch.utils.data import DataLoader

In [13]:
def _normalization(input):
  input = (input - input.min()) / (input.max() - input.min())
  return input

def _standardization(input):
  input = (input - mean) / std
  return input

def _to_tensor(input, name, h, w):
  input = input.astype('float32')
  input = input.reshape((1, input.shape[0], input.shape[1]))

  data = {'name':name, 'input': torch.from_numpy(input), 'height': h, 'width': w}
  return data

def _random_augment(input):
  h, w  = input.shape
  
  transform = A.Compose([A.HorizontalFlip(p=0.5),
                      A.VerticalFlip(p=0.5),
                      A.RandomCrop(height=int(h*0.8), width=int(w*0.8), p=0.5)
                      ])

  augmented = transform(image=input)

  input = augmented['image']

  return input

## Resize by aspect ratio and Padding Image

In [14]:
def resize_image(input):
  # shape: (height, width, channel)
  # aspect ratio를 고려하여 resize
  if input.shape[1] > input.shape[0]:   # height를 기준으로
    r = image_size / input.shape[1]
    dim = ( image_size, int(input.shape[0] * r))
  else:                                 # width를 기준으로
    r = image_size / input.shape[0]
    dim = (int(input.shape[1] * r), image_size)
    
  resized_input = cv.resize(input, dim, interpolation = cv.INTER_AREA)
  
  return resized_input

def padding_image(input):
  input_size = input.shape
  target_size = (image_size, image_size)

  if input_size[1] < image_size:
    padding_range = int(target_size[1]-input_size[1])
  elif input_size[0] < image_size:
    padding_range = int(target_size[0]-input_size[0])
  else:
    return input

  if padding_range%2 == 0:
    padding_size = (int(padding_range/2), int(padding_range/2))
  else:
    padding_size = (int(padding_range/2), int(padding_range/2)+1)
  
  if input_size[1] < image_size:
    npad= ((0,0),padding_size)
  elif input_size[0] < image_size:
    npad= (padding_size, (0,0))

  padding_input = np.pad(input, npad,'constant', constant_values=(0))

  return padding_input

## DataLoader

In [15]:
# DataLoader

class VertebraeDataset(Dataset):

  def __init__(self, data, augment=False):
    super(VertebraeDataset, self).__init__()

    self.data = data      
    self.augment = augment
    self.image_size = image_size

  def __getitem__(self, index):
    # Read input, label
    name = self.data[index]['name']
    input = cv.imread(self.data[index]['image_path'])
    h, w, _ = input.shape
    try:
      input = cv.cvtColor(input, cv.COLOR_BGR2GRAY)    
    except:
      print(self.data[index]['image_path'])
      exit(-1)

    # # 영상 개선
    clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(16, 16))
    input = clahe.apply(input)
    input = input / 255.0
    
    if self.augment:
      input = _random_augment(input)
      
    # Resize and pad Image
    input = resize_image(input)
    input = padding_image(input)

    data = _to_tensor(input, name, h, w)

    return data

  def __len__(self):
    return len(self.data)

In [16]:
def get_data(data, shuffle=True):
    ### Train Dataset 가져오기
    dataset = VertebraeDataset(data, augment=augment)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)  

    return dataset, loader

# pytorch Dataloader
test_dataset, test_loader = get_data(data_test, shuffle=False)

_data = test_dataset.__getitem__(0)


# test_dataset, test_loader = get_data(data_train[:1])

## Parameter 설정


In [17]:
import torch.nn.functional as F
import segmentation_models_pytorch as smp
from torch import nn

In [18]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class DiceLoss(nn.Module):
    """
    Dice score loss function
    """
    def __init__(self):
        super(DiceLoss, self).__init__()
        self.smooth = 1.0

    def forward(self, output, label):
        assert output.size() == label.size()
        output = output[:, 0].contiguous().view(-1)
        label = label[:, 0].contiguous().view(-1)
        intersection = (output * label).sum()
        dsc = (2. * intersection + self.smooth) / (output.sum() + label.sum() + self.smooth)

        return 1. - dsc


class DiceBCELoss(nn.Module):
    def __init__(self, weight=None, size_average=True):
        super(DiceBCELoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):

        # comment out if your model contains a sigmoid or equivalent activation layer
        inputs = torch.sigmoid(inputs)

        # flatten label and prediction tensors
        inputs = inputs.view(-1)
        targets = targets.view(-1)

        intersection = (inputs * targets).sum()
        dice_loss = 1 - (2. * intersection + smooth) /(inputs.sum() + targets.sum() + smooth)
        BCE = F.binary_cross_entropy(inputs, targets, reduction='mean')
        Dice_BCE = BCE + dice_loss

        return Dice_BCE

class Binary_Loss(nn.Module):
    def __init__(self):
        super(Binary_Loss, self).__init__()
        self.criterion = nn.BCEWithLogitsLoss()


    def forward(self, model_output, targets):
        loss = self.criterion(model_output, targets)

       
        return loss

bcedice_loss = DiceBCELoss().cuda()
binary_loss = Binary_Loss().cuda()
dsc_loss = DiceLoss().cuda()

# Model

In [19]:
from segmentation_models_pytorch.unet.model import Unet

In [20]:
from collections import OrderedDict

if model_sort == 'unet':
  class UNet(nn.Module):

      def __init__(self, in_channels=1, out_channels=1, init_features=32):
          super(UNet, self).__init__()

          features = init_features
          self.encoder1 = UNet._block(in_channels, features, name="enc1")
          self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
          self.encoder2 = UNet._block(features, features * 2, name="enc2")
          self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
          self.encoder3 = UNet._block(features * 2, features * 4, name="enc3")
          self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
          self.encoder4 = UNet._block(features * 4, features * 8, name="enc4")
          self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

          self.bottleneck = UNet._block(features * 8, features * 16, name="bottleneck")

          self.upconv4 = nn.ConvTranspose2d(
              features * 16, features * 8, kernel_size=2, stride=2
          )
          self.decoder4 = UNet._block((features * 8) * 2, features * 8, name="dec4")
          self.upconv3 = nn.ConvTranspose2d(
              features * 8, features * 4, kernel_size=2, stride=2
          )
          self.decoder3 = UNet._block((features * 4) * 2, features * 4, name="dec3")
          self.upconv2 = nn.ConvTranspose2d(
              features * 4, features * 2, kernel_size=2, stride=2
          )
          self.decoder2 = UNet._block((features * 2) * 2, features * 2, name="dec2")
          self.upconv1 = nn.ConvTranspose2d(
              features * 2, features, kernel_size=2, stride=2
          )
          self.decoder1 = UNet._block(features * 2, features, name="dec1")

          self.conv = nn.Conv2d(
              in_channels=features, out_channels=out_channels, kernel_size=1
          )

      def forward(self, x):
          enc1 = self.encoder1(x)
          enc2 = self.encoder2(self.pool1(enc1))
          enc3 = self.encoder3(self.pool2(enc2))
          enc4 = self.encoder4(self.pool3(enc3))

          bottleneck = self.bottleneck(self.pool4(enc4))

          dec4 = self.upconv4(bottleneck)
          dec4 = torch.cat((dec4, enc4), dim=1)
          dec4 = self.decoder4(dec4)
          dec3 = self.upconv3(dec4)
          dec3 = torch.cat((dec3, enc3), dim=1)
          dec3 = self.decoder3(dec3)
          dec2 = self.upconv2(dec3)
          dec2 = torch.cat((dec2, enc2), dim=1)
          dec2 = self.decoder2(dec2)
          dec1 = self.upconv1(dec2)
          dec1 = torch.cat((dec1, enc1), dim=1)
          dec1 = self.decoder1(dec1)
          return torch.sigmoid(self.conv(dec1))

      @staticmethod
      def _block(in_channels, features, name):
          return nn.Sequential(
              OrderedDict(
                  [
                      (
                          name + "conv1",
                          nn.Conv2d(
                              in_channels=in_channels,
                              out_channels=features,
                              kernel_size=3,
                              padding=1,
                              bias=False,
                          ),
                      ),
                      (name + "norm1", nn.BatchNorm2d(num_features=features)),
                      (name + "relu1", nn.ReLU(inplace=True)),
                      (
                          name + "conv2",
                          nn.Conv2d(
                              in_channels=features,
                              out_channels=features,
                              kernel_size=3,
                              padding=1,
                              bias=False,
                          ),
                      ),
                      (name + "norm2", nn.BatchNorm2d(num_features=features)),
                      (name + "relu2", nn.ReLU(inplace=True)),
                  ]
              )
          )
  model = UNet().to(device)
  print('unet')
else:
  model = Unet(
    encoder_name="efficientnet-b5",        # choose encoder, e.g. mobilenet_v2 or efficientnet-b7
    encoder_weights=None,     # use `imagenet` pre-trained weights for encoder initialization
    in_channels=1,                  # model input channels (1 for gray-scale images, 3 for RGB, etc.)
    classes=1,).to(device)
  print('efficient')

# Optimizer Adam 설정
if optim_mode == 'adam':
  optim = torch.optim.Adam(model.parameters(), lr=lr)
elif optim_mode == 'sgd':
  optim = torch.optim.SGD(model.parameters(), lr=lr)
elif optim_mode == 'adamW':
  optim = torch.optim.Adam(model.parameters(), lr=lr)
print(optim_mode)

efficient
adam


In [21]:
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)  # Tensor를 numpy로 변환
fn_denorm = lambda x, mean, std: (x * std) + mean  # DeNomarlization
fn_class = lambda x: 1.0 * (x > 0.4)

In [22]:
def load_model(ckpt_dir, model, optim):
    if not os.path.exists(ckpt_dir):
        epoch = 0
        return model

    ckpt_lst = os.listdir(ckpt_dir)
    ckpt_lst.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))
    print(os.path.join(ckpt_dir, ckpt_lst[-1]))

    dict_model = torch.load(os.path.join(ckpt_dir, ckpt_lst[-1]))
    model.load_state_dict(dict_model['model'], strict=False)
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])
    print("Get saved weights successfully.")

    return model, optim, epoch

def save_model(ckpt_dir, model, optim, epoch):
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    torch.save({'model': model.state_dict(), 'optim': optim.state_dict()},
                "./%s/model_epoch%d.pth" % (ckpt_dir, epoch))
    print(f'>> save model_{epoch}.pth')

# Test

In [23]:
def numeric_score(output, label):
    FP = np.float(np.sum((output == 1) & (label == 0)))
    FN = np.float(np.sum((output == 0) & (label == 1)))
    if FP != 0.0 or FN != 0.0:
        pass
    TP = np.float(np.sum((output == 1) & (label == 1)))
    TN = np.float(np.sum((output == 0) & (label == 0)))

    return FP, FN, TP, TN

def get_score(output, label):
    FP, FN, TP, TN = numeric_score(output, label)
    N = FP + FN + TP + TN

    epsilon = 1e-5

    # Recall : TP / TP+FN
    recall = np.divide(TP, TP + FN + epsilon)
    # Precision : TP / TP+FP
    precision = np.divide(TP, (TP+FP+epsilon))

    accuracy = np.divide((TP + TN), N+epsilon)

    # F1 socre = 2 * (A interect B) / |A| + |B| = 2TP / 2TP + FP + FN
    f1_score = 2 * (precision*recall) / (precision + recall + epsilon)
    dice_coeff = 2*TP / (2*TP+FP+FN+epsilon)

    # J(A,B) = | A intersect B | / | A union B |
    jaccard_score = TP / (TP+FN+FP+ epsilon)

    return recall * 100, precision * 100, accuracy * 100, f1_score*100, jaccard_score*100


In [24]:
result_dir

'Evaluate result/CheXpert/mask'

In [25]:
import copy

def metric(gt,pred):
    preds = pred.detach().numpy()
    gts = gt.detach().numpy()

    pred = preds.astype(int)  # float data does not support bit_and and bit_or
    gdth = gts.astype(int)  # float data does not support bit_and and bit_or
    fp_array = copy.deepcopy(pred)  # keep pred unchanged
    fn_array = copy.deepcopy(gdth)
    gdth_sum = np.sum(gdth)
    pred_sum = np.sum(pred)
    intersection = gdth & pred
    union = gdth | pred
    intersection_sum = np.count_nonzero(intersection)
    union_sum = np.count_nonzero(union)

    tp_array = intersection

    tmp = pred - gdth
    fp_array[tmp < 1] = 0

    tmp2 = gdth - pred
    fn_array[tmp2 < 1] = 0

    tn_array = np.ones(gdth.shape) - union

    tp, fp, fn, tn = np.sum(tp_array), np.sum(fp_array), np.sum(fn_array), np.sum(tn_array)

    smooth = 0.001
    precision = tp / (pred_sum + smooth)
    recall = tp / (gdth_sum + smooth)

    false_positive_rate = fp / (fp + tn + smooth)
    false_negtive_rate = fn / (fn + tp + smooth)

    jaccard = intersection_sum / (union_sum + smooth)
    dice = 2 * intersection_sum / (gdth_sum + pred_sum + smooth)
    
    epsilon = 1e-5
    f1_score = 2 * (precision*recall) / (precision + recall + epsilon)

    return false_positive_rate,false_negtive_rate,f1_score
    

In [33]:
def _normalize_image(image, *, image_cmap=None):
    image = img_as_float(image)
    if image.ndim == 2:
        if image_cmap is None:
            image = gray2rgb(image)
        else:
            image = plt.get_cmap(image_cmap)(image)[..., :3]
    return image

In [34]:
def overlay_skeleton_2d(image, skeleton, *,
                        image_cmap=None, color=(1, 0, 0), alpha=1,
                        dilate=0, axes=None):
    image = _normalize_image(image, image_cmap=image_cmap)
    skeleton = skeleton.astype(bool)
    if dilate > 0:
        selem = morphology.disk(dilate)
        skeleton = morphology.binary_dilation(skeleton, selem)
    image[skeleton] = alpha * np.array(color) + (1 - alpha) * image[skeleton]
    return image

In [28]:
# !pip uninstall -y opencv-python opencv-contrib-python

In [29]:
# !pip install opencv-contrib-python

In [30]:
from cv2.ximgproc import thinning

def remove_horizontal_line(output):
  line_kernel = np.zeros((7, 7),dtype=np.uint8)
  line_kernel[3,...]=1
  x=cv.morphologyEx(output, cv.MORPH_OPEN, line_kernel ,iterations=1)
  _output= output - x

  return _output 

In [39]:
final_result_dir = './Evaluate result/PadChest/line'

def post_processing(output):
  
  # # Opening Image
  kernel = np.ones((9, 9), np.uint8)
  result_opening = cv.morphologyEx(output, cv.MORPH_OPEN, kernel)
  
  # Thining Image
  result_thinning = thinning((result_opening*255).astype('uint8'))
  # Remove horizontal_line
  result_thinning = remove_horizontal_line(result_thinning)

  # # Skeletonizing Image
  # result_skeletonizing = (skeletonize(result_opening)*255).astype('uint8')

  return result_opening, result_thinning

def save_predictedMask(input, output, name, origin_height, origin_width):
    for idx in range(input.shape[0]):
        file_name = name[idx]
        _input = input[idx].astype('uint8').squeeze()

        result_opening, result_thinning = post_processing(output[idx].squeeze())
        # pruned_skeleton, segmented_img, segment_objects = pcv.morphology.prune(skel_img=result_skeletonizing, size=300)

        overlay_image = overlay_skeleton_2d(_input, result_thinning, dilate=1)
        
        cv.imwrite(os.path.join('./Evaluate result/CheXpert/mask', file_name+'.png'), result_opening*255)
        cv.imwrite(os.path.join('./Evaluate result/CheXpert/line/256', file_name+'.png'), result_thinning)
        
        h = origin_height[idx].item()
        w = origin_width[idx].item()
        dim = (w, h)
        resized_result_thinning = cv.resize(result_thinning, dim, interpolation = cv.INTER_NEAREST)
        cv.imwrite(os.path.join('./Evaluate result/CheXpert/line/origin_size', file_name+'.png'), resized_result_thinning)
        
        fig, ax = plt.subplots()
        plt.axis('off'), plt.xticks([]), plt.yticks([])
        plt.tight_layout()
        plt.imshow(overlay_image)
        plt.savefig(os.path.join('./Evaluate result/CheXpert/overlay', file_name+'.png'),bbox_inches='tight')
        plt.close()

In [40]:
st_epoch=0

print(result_dir)
model, optim, st_epoch = load_model(ckpt_dir=ckpt_dir, model=model, optim=optim)

with torch.no_grad():
    model.eval()

    loss_arr = []
    total_recall = []
    total_precision = []
    total_f1_score = []
    total_jaccard = []

    for iter, data in enumerate(tqdm(test_loader), 1):
        name = data['name']
        input = data['input'].to(device)

        origin_height = data['height']
        origin_width = data['width']
        
        output = model(input)

        # for metrics
        logit = torch.sigmoid(output)
        output = logit.clone()
        output[output>0.5] = 1
        output[output<=0.5] = 0

        input = fn_tonumpy(input*255)
        output = fn_tonumpy(output)
        
        save_predictedMask(input, output, name, origin_height, origin_width)

Evaluate result/CheXpert/mask
이승아/checkpoint/EfficientNet/V2/model_epoch300.pth
Get saved weights successfully.


100%|██████████| 28/28 [20:04<00:00, 43.00s/it]


In [None]:
from google.colab import drive
drive.mount('/content/drive')