In [1]:
import os
import gc
from glob import glob
import warnings
import random
from collections import defaultdict
import copy
import math
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import font_manager, rc
import itertools
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score, roc_curve
from tqdm.notebook import tqdm
from PIL import Image
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms.functional import to_pil_image
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import wandb
import timm

warnings.filterwarnings(action='ignore')
Image.MAX_IMAGE_PIXELS = None
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
gc.collect()
torch.cuda.empty_cache()
# os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3,4'
matplotlib.rcParams.update({'font.size': 14})
plt.rcParams['axes.unicode_minus'] = False

In [2]:
def seed_everything(random_seed: int):
    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed) # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)
    os.environ['PYTHONHASHSEED'] = str(random_seed)

In [3]:
##### HYPER PARAMETER

SEED = 0
CNT = 0
IMG_SIZE = (150, 150) # width, height
DROP_RATE = 0.5
BATCH_SIZE = 1
EPOCH = 1000
PATIENCE = 20
LEARNING_RATE = 5e-4
WEIGHT_DECAY = 5e-5
PRINT_EVERY = 10

GPU_IDX = 4

In [4]:
seed_everything(SEED)

In [5]:
def add_padding(img):
    height, width, _ = img.shape
    img_width, img_height = IMG_SIZE
    fill_value = (0, 0, 0)
    if width / height == img_width / img_height:
        img = cv2.resize(img, (IMG_SIZE))
    elif width / height < img_width / img_height:
        tmp_width = math.ceil(width / (height / img_height))
        img = cv2.resize(img, (tmp_width, img_height))
        to_add = img_width - tmp_width
        left = to_add // 2
        right = to_add - left
        img = cv2.copyMakeBorder(img, 0, 0, left, right, cv2.BORDER_CONSTANT, value=fill_value)
    else:
        tmp_height = math.ceil(height / (width / img_width))
        img = cv2.resize(img, (img_width, tmp_height))
        to_add = img_height - tmp_height
        top = to_add // 2
        bottom = to_add - top
        img = cv2.copyMakeBorder(img, top, bottom, 0, 0, cv2.BORDER_CONSTANT, value=fill_value)
    return img

def get_preprocessing():
    _transform = [
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ]
    return A.Compose(_transform)

In [6]:
class PolypDataset(Dataset):
    def __init__(self, file_list, label_list, preprocessing=False):
        self.file_list = file_list
        self.label_list = label_list
        self.preprocessing = get_preprocessing() if preprocessing else None
    def __len__(self):
        return len(self.file_list)
    def __getitem__(self, idx):
        global SEED, CNT
        img_path = self.file_list[idx]
        xmin, xmax, ymin, ymax = df.loc[df['img_path']==img_path, ['xmin', 'xmax', 'ymin', 'ymax']].values[0]
        img = cv2.imread(img_path)
        img_h, img_w, _ = img.shape
        img = img[ymin:ymax, xmin:xmax, :]
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = add_padding(img)
        if self.preprocessing:
                sample = self.preprocessing(image=img)
                img = sample['image']
        label = self.label_list[idx]
        return img, label

In [7]:
backbone = timm.create_model('resnet50', pretrained=True, num_classes=1024, drop_rate=DROP_RATE)
class ResNet_fc(nn.Module):
    def __init__(self, _backbone, num_classes=2):
        super().__init__()
        
        backbone = copy.deepcopy(_backbone)
        self.backbone = backbone
        self.classifier = nn.Linear(in_features=1024, out_features=num_classes)

    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x
target_layers = ['conv1', 'layer1.2.conv3', 'layer2.3.conv3', 'layer3.5.conv3', 'layer4.2.conv3', 'global_pool', 'fc', 'classifier']

In [10]:
SAVE_PATH = 'dlfeature_path'
model_path = 'model_path_pth'

if os.path.isdir(SAVE_PATH):
    pass
else:
    os.mkdir(SAVE_PATH)
for name in target_layers:
    dir_path = f'{SAVE_PATH}/{name}/'
    if os.path.isdir(dir_path):
        continue
    else:
        os.mkdir(dir_path)

In [11]:
df = pd.read_csv(f'/workspace/data4/changwoo/Colonoscopy/XAI_test/XAI_test.csv')
img_list = df['img_path'].tolist()
label_list = df['label'].tolist()
test_data = PolypDataset(img_list, label_list, box_noise=False, light_gen=False, augmentation=False, preprocessing=True)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
model = ResNet_fc(backbone, num_classes=2)
model.load_state_dict(torch.load(f'/workspace/data4/changwoo/Colonoscopy/weights/EndoAI/SRgen_CutMix_0.001/fold_{fold}.pth', map_location='cpu'))
model.eval()
model = model.to(GPU_IDX)

for layer_idx, target_module_name in enumerate(tqdm(target_layers)):
    class model_forward(nn.Module):
        def __init__(self,model):
            super(model_forward,self).__init__()
            self.model = model
            self.model.eval()
            if layer_idx == 0:
                self.model.backbone.act1.register_forward_hook(self.forward_hook)
            elif layer_idx == 1:
                self.model.backbone.layer1[-1].act3.register_forward_hook(self.forward_hook)
            elif layer_idx == 2:
                self.model.backbone.layer2[-1].act3.register_forward_hook(self.forward_hook)
            elif layer_idx == 3:
                self.model.backbone.layer3[-1].act3.register_forward_hook(self.forward_hook)
            elif layer_idx == 4:
                self.model.backbone.layer4[-1].act3.register_forward_hook(self.forward_hook)
            elif layer_idx == 5:
                self.model.backbone.global_pool.register_forward_hook(self.forward_hook)
            elif layer_idx == 6:
                self.model.backbone.fc.register_forward_hook(self.forward_hook)
            elif layer_idx == 7:
                self.model.classifier.register_forward_hook(self.forward_hook)
            else:
                print(target_layers[layer_idx])

        def forward_hook(self, _, input, output):
            self.forward_result = output
        def forward(self,x):
            x = self.model(x)
            return self.forward_result
    
    model_forward = model_forward(model)
    model.eval()
    with torch.no_grad():
        for data_idx, (data, label) in enumerate(test_dataloader):
            data = data.to(GPU_IDX)
            data = model_forward(data)[0].detach().cpu().numpy()
            data_path = img_list[data_idx]
            fname = data_path.split('/')[-1].split('.')[0]
            save_path = f'/workspace/data4/changwoo/Colonoscopy/XAI_test/feature_map/fold_{fold}/{target_module_name}/{fname}.npy'
            np.save(save_path, data)

  0%|          | 0/5 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]

  0%|          | 0/8 [00:00<?, ?it/s]