In [1]:
import os
import cv2
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data.sampler import Sampler
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau

import torchmetrics
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping


import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, precision_score, recall_score

import warnings
warnings.filterwarnings(action='ignore')

N_EPOCHS = 1000
BATCH_SIZE = 256
LEARNING_RATE = 0.0005
PAITENCE = 20

IM_HEIGHT = 256
IM_WIDTH = 256

classification_mode = "three_classes"
model_name = "resnet50"

In [2]:
def generate_patch_df(flist, label):
    df = pd.DataFrame({"fpath": flist})
    df['slide_id'] = df['fpath'].map(lambda x: x.split("/")[-1].split(".")[0].split("_")[0])
    df['patient_id'] = df['slide_id'].map(lambda x: x.split("-")[0])
    df['target'] = label

    df = df.loc[:, ["patient_id", "slide_id", "fpath", "target"]]
    
    return df


def define_dataset(positive_df, negative_df, normal_df, sampling_rate=0.2):
    X_train_pos, X_test_pos, y_train_pos, y_test_pos = train_test_split(positive_df['fpath'], positive_df['target'], test_size=sampling_rate, random_state=1234)
    X_train_pos, X_valid_pos, y_train_pos, y_valid_pos = train_test_split(X_train_pos, y_train_pos, test_size=sampling_rate, random_state=1234)

    X_train_neg, X_test_neg, y_train_neg, y_test_neg = train_test_split(negative_df['fpath'], negative_df['target'], test_size=sampling_rate, random_state=1234)
    X_train_neg, X_valid_neg, y_train_neg, y_valid_neg = train_test_split(X_train_neg, y_train_neg, test_size=sampling_rate, random_state=1234)
    
    X_train_normal, X_test_normal, y_train_normal, y_test_normal = train_test_split(normal_df['fpath'], normal_df['target'], test_size=sampling_rate, random_state=1234)
    X_train_normal, X_valid_normal, y_train_normal, y_valid_normal = train_test_split(X_train_normal, y_train_normal, test_size=sampling_rate, random_state=1234)
    
    X_train = np.hstack([X_train_pos, X_train_neg, X_train_normal])
    X_valid = np.hstack([X_valid_pos, X_valid_neg, X_valid_normal])
    X_test = np.hstack([X_test_pos, X_test_neg, X_test_normal])

    y_train = np.hstack([y_train_pos, y_train_neg, y_train_normal])
    y_valid = np.hstack([y_valid_pos, y_valid_neg, y_valid_normal])
    y_test = np.hstack([y_test_pos, y_test_neg, y_test_normal])
    
    return X_train, X_valid, X_test, y_train, y_valid, y_test
    
    
positive_flist = glob.glob("data/LVI_dataset/patch_image_size-300_overlap-0/LVI/*.png")
negative_flist = glob.glob("data/LVI_dataset/patch_image_size-300_overlap-0/Negative/*.png")
normal_flist = glob.glob("data/LVI_dataset/patch_image_size-300_overlap-0/Normal/*.png")

positive_df = generate_patch_df(positive_flist, 1)
negative_df = generate_patch_df(negative_flist, 0)
normal_df = generate_patch_df(normal_flist, 2)

X_train, X_valid, X_test, y_train, y_valid, y_test = define_dataset(positive_df, negative_df, normal_df, sampling_rate=0.2)
print(f"X train: {X_train.shape}\nX valid: {X_valid.shape}\nX test: {X_test.shape}")
print(f"y train: {y_train.shape}\ny valid: {y_valid.shape}\ny test: {y_test.shape}")


X train: (227880,)
X valid: (56971,)
X test: (71215,)
y train: (227880,)
y valid: (56971,)
y test: (71215,)


In [3]:
train_transforms = A.Compose([ 

    A.RandomCrop(width=IM_WIDTH, height=IM_HEIGHT, p=1.0),
    
    A.OneOf([
        A.Transpose(),
        A.HorizontalFlip(),
        A.VerticalFlip()
    ], p=0.5),

#     A.OneOf([
#        A.ElasticTransform(),
#        A.Rotate(25)
#     ], p=0.8),

    A.OneOf([
       A.Blur(),
       A.GaussianBlur(),
       A.GaussNoise(),
       A.MedianBlur()
    ], p=0.2),

    A.OneOf([
       A.ChannelShuffle(),
       A.ColorJitter(),
       A.HueSaturationValue(),
       A.RandomBrightnessContrast()
    ], p=0.5),
    
    A.Normalize(p=1.0),
    ToTensorV2()
])


valid_transforms = A.Compose([ 
    A.Resize(width=IM_WIDTH, height=IM_HEIGHT, p=1.0),
    A.Normalize(p=1.0),
    ToTensorV2()
])


class LVIDataset(Dataset):
    def __init__(self, X, y, transforms):
        self.X = X
        self.y = y
        self.transforms = transforms
        
    def __len__(self):
        return self.X.shape[0]
    
    def __getitem__(self, idx):
        image  = cv2.imread(self.X[idx])
        target = self.y[idx]

        augmented = self.transforms(image=image)
        image = augmented['image']
        
        return image, target

    
train_dataset = LVIDataset(X_train, y_train, transforms=train_transforms)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=12, pin_memory=True, shuffle=True)

valid_dataset = LVIDataset(X_valid, y_valid, transforms=valid_transforms)
valid_dataloader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, num_workers=12, pin_memory=True, shuffle=True)

test_dataset = LVIDataset(X_test, y_test, transforms=valid_transforms)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=12, pin_memory=True, shuffle=False)

In [4]:
class ImageClassifier(pl.LightningModule):
    def __init__(self, model_name, learning_rate, num_classes=3):
        super(ImageClassifier, self).__init__()
        self.model = timm.create_model(model_name, num_classes=num_classes, pretrained=True)
        self.learning_rate = learning_rate
        
        self.train_accuracy = torchmetrics.Accuracy()
        self.valid_accuracy = torchmetrics.Accuracy()
        self.test_accuracy = torchmetrics.Accuracy()
    
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        loss = F.cross_entropy(y_hat, y)
        self.log("train_loss", loss)
        self.log("train_accuracy", self.train_accuracy(y_hat, y), on_step=True, on_epoch=True, prog_bar=True, logger=True)

        return loss
    
        
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        loss = F.cross_entropy(y_hat, y)
        self.log("valid_loss", loss)
        self.log("valid_accuracy", self.valid_accuracy(y_hat, y), on_step=False, on_epoch=True, prog_bar=True, logger=True)

    
    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        loss = F.cross_entropy(y_hat, y)
        self.log("test_loss", loss)
        self.log("test_accuracy", self.test_accuracy(y_hat, y), on_step=False, on_epoch=True, prog_bar=True, logger=True)
          
        
    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5)
        
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "valid_loss"}

    
def define_callbacks(patience):
    return EarlyStopping('valid_loss', patience=patience)

In [None]:
classifer = ImageClassifier(model_name, LEARNING_RATE)
callbacks = define_callbacks(PAITENCE)
# trainer = pl.Trainer(accelerator="cpu", num_processes=1, max_epochs=1, enable_progress_bar=True)
trainer = pl.Trainer(gpus=1, max_epochs=N_EPOCHS, enable_progress_bar=True, callbacks=callbacks)
trainer.fit(classifer, train_dataloader, valid_dataloader)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name           | Type     | Params
--------------------------------------------
0 | model          | ResNet   | 23.5 M
1 | train_accuracy | Accuracy | 0     
2 | valid_accuracy | Accuracy | 0     
3 | test_accuracy  | Accuracy | 0     
--------------------------------------------
23.5 M    Trainable params
0         Non-trainable params
23.5 M    Total params
94.057    Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

In [None]:
torch.save(classifer.model.state_dict(), f"./weights/{classification_mode}_{model_name}_patch-{IM_WIDTH}.pt")

In [1]:
def prediction(model, test_dataloader, weights_path=None):
    X = np.array(())
    y = np.array(())
    
    if weights_path:
        print("load pretrained weights")
        model.load_state_dict(torch.load(weights_path))
        
    model.to("cuda")

    for batch in tqdm(test_dataloader):
        pred = model(batch[0].to("cuda"))
        pred = pred.clone().detach().to("cpu")
        pred = np.argmax(pred.numpy(), axis=1)

        true = batch[1].numpy()

        X = np.hstack([X, pred])
        y = np.hstack([y, true])
    
    return X, y

    
def evaluation_metrics_log(X, y):
    accuracy = accuracy_score(X, y)
    f1_score = f1_score(X, y)
    precision = precision_score(X, y)
    recall = recall_score(X, y)
    confusion_mat = confusion_matrix(X, y)
    
    print("Evaluation results")
    print(f"accuracy: {np.round(accuracy, 4)} f1: {np.round(f1_score, 4)} precision: {np.round(precision, 4)} recall: {np.round(recall, 4)}")
    print()
    print(confusion_mat)

    
X, y = prediction(classifier.model, test_dataloader, f"./weights/{classification_mode}_{model_name}_patch-{IM_WIDTH}.pt")
evaluation_metrics(X, y)

NameError: name 'np' is not defined