# Pytorch Lightining

In [1]:
import pytorch_lightning as pl
from sklearn.model_selection import train_test_split
import os
import numpy as np
from monai.transforms import (
    AsChannelFirst,
    Compose,
    RandFlip,
    RandRotate,
    RandZoom,
    Resize
)
from monai.data import DataLoader
import torch

def labeling(data) :
    label = []
    for path in data : 
        label.append(os.path.basename(os.path.dirname(path)))
    return label

train_transforms = Compose(
    [   AsChannelFirst(2),
        Resize((256,256)),
        RandRotate(range_x=np.pi / 12, prob=0.5, keep_size=True),
        RandFlip(spatial_axis=0, prob=0.5),
        RandZoom(min_zoom=0.9, max_zoom=1.1, prob=0.5),
    ]
)

val_transforms = Compose(
    [   AsChannelFirst(2),
        Resize((256,256)),
            ]
)
    

class Sequence_Dataset(torch.utils.data.Dataset) : 
    def __init__(self,image_files,label, transforms): 
        self.image_files = image_files
        self.transforms = transforms
        self.label = label
    def __len__(self):
        return len(self.image_files)
    
    
    def __getitem__(self,index) :
        img = np.load(self.image_files[index])
        return self.transforms(img), self.label[index]

class DataModule(pl.LightningDataModule) : 
    
    def __init__(self, data_dir : list , batch_size : int ) : 
        
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size
        self.__ = None
        
    def prepare_data(self) :
        y = labeling(X)
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=0.2,shuffle=True ,random_state=42,stratify=y)
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(X_train, y_train, test_size=0.2,shuffle=True ,random_state=42,stratify=y_train)
    
    def setup(self,stage : str) : 
        
        if stage == "fit" : 
            self.train_ds =  Sequence_Dataset(self.X_train,self.y_train,train_transforms)
            self.val_ds = Sequence_Dataset(self.X_val,self.y_val,val_transforms)
            
        if stage == "test" : 
            self.test_ds = Sequence_Dataset(self.X_test,self.y_test,val_transforms)
            
    def train_dataloader(self) :
        
        return DataLoader(self.train_ds, batch_size  = self.batch_size, shuffle=True, num_workers=8 , pin_memory=True)

    def val_dataloader(self) :
        
        return DataLoader(self.val_ds, batch_size  = self.batch_size, shuffle=True, num_workers=8 , pin_memory=True)

    def test_dataloader(self) :
        
        return DataLoader(self.test_ds, batch_size  = self.batch_size, shuffle=True, num_workers=8 , pin_memory=True)



  from neptune.version import version as neptune_client_version
  from neptune import new as neptune


In [2]:
from monai.utils import set_determinism
import os
set_determinism(seed=0)
T1  = ["/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/T1/" + x for x in os.listdir("/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/T1/")]
T2  = ["/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/T2/" + x for x in os.listdir("/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/T2/")]
FLAIR  = ["/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/FLAIR/" + x for x in os.listdir("/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/FLAIR/")]
OTHER  = ["/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/OTHER/" + x for x in os.listdir("/home/jovyan/Verdicchio/ONWAY_DATA/Dataset/OTHER/")]


X = T1 + T2 + FLAIR +OTHER 
y = list(np.zeros(len(T1),dtype = int)) + list(np.ones(len(T2),dtype = int)) + list(2*np.ones(len(FLAIR),dtype = int)) + list(3*np.ones(len(OTHER),dtype = int))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,shuffle=True ,random_state=42,stratify=y)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2,shuffle=True ,random_state=42,stratify=y_train)

dm = DataModule(X,1)
dm.prepare_data()
dm.setup(stage ="fit")
A = next(iter(dm.train_dataloader()))



## Model Definition

In [3]:
import pytorch_lightning as pl
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
class Model(pl.LightningModule):
    def __init__(self, model,metric,loss_function):
        super().__init__()
        self.model = model
        self.loss_function = loss_function
        self.metric = metric
        
    def forward(self, x) : 
        self.model(x)
    
    def prepare_batch(self,batch) : 
        return  batch[0].to(device), torch.tensor(batch[1]).to(device)
    
    def infer_batch(self, batch):
        x, y = self.prepare_batch(batch)
        y_hat = self.model(x)
        return y_hat, y

    
    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=1e-3, weight_decay=1e-5)
        return optimizer, StepLR(optimizer, gamma = 0.1, step_size=25,verbose = True, last_epoch = 150)

    def training_step(self, batch, batch_idx):
        outputs,labels = self.infer_batch(batch)
        loss = self.loss_function(output, labels)
        self.log("train_loss", loss.item(), on_step=True, on_epoch=True, prog_bar=True, logger=True)
        
    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x["loss"] for x in outputs]).mean()
        self.epoch_loss_values.append(avg_loss.detach().cpu().numpy())

    def validation_step(self, batch, batch_idx):
        outputs,labels = self.infer_batch(batch)
        loss = self.loss_function(outputs, labels)
        y_onehot = [y_trans(i) for i in decollate_batch(y, detach=False)]
        labels = [y_pred_trans(i) for i in decollate_batch(y_pred)]
        self.metric(y_pred=outputs, y=labels)
        return {"val_loss": loss, "val_number": len(outputs)}
    


# Training Phase

In [4]:
from torch.nn import CrossEntropyLoss
from torchvision.models import resnet50
from sklearn.utils.class_weight import compute_class_weight
from monai.metrics import ROCAUCMetric
from lightning.pytorch.callbacks import LearningRateMonitor, ModelCheckpoint

device = torch.device("cuda" if torch.cuda.is_available()  else "cpu")
model = resnet50(weights= None, num_classes=4,progress=True).to(device)
class_weight = torch.tensor(compute_class_weight("balanced",classes = np.unique(y_train),y=y_train ))
loss_function = CrossEntropyLoss(weight= class_weight.float().to(device))
metric = ROCAUCMetric
net = Model(model,metric,loss_function)

In [5]:
def train_model(net, save_name,CHECKPOINT_PATH, **kwargs):
    """
    Inputs:
        net - Model Is used to look up the class in "model_dict"
        save_name (optional) - If specified, this name will be used for creating the checkpoint and logging directory.
    """
    
    # Create a PyTorch Lightning trainer with the generation callback
    trainer = pl.Trainer(
        default_root_dir=os.path.join(CHECKPOINT_PATH, save_name),  # Where to save models
        accelerator="auto",
        devices=1,
        # How many epochs to train for if no patience is set
        max_epochs=180,
        callbacks=[
            ModelCheckpoint(
                save_weights_only=True, mode="max", monitor="val_acc"
            ),  # Save the best checkpoint based on the maximum val_acc recorded. Saves only weights and not optimizer
            LearningRateMonitor("epoch"),
        ],  # Log learning rate every epoch
    )  # In case your notebook crashes due to the progress bar, consider increasing the refresh rate
    trainer.logger._log_graph = True  # If True, we plot the computation graph in tensorboard
    trainer.logger._default_hp_metric = None  # Optional logging argument that we don't need
    # Check whether pretrained model exists. If yes, load it and skip training
    pretrained_filename = os.path.join(CHECKPOINT_PATH, save_name + ".ckpt")
    if os.path.isfile(pretrained_filename):
        print(f"Found pretrained model at {pretrained_filename}, loading...")
        # Automatically loads the model with the saved hyperparameters
        model = net.load_from_checkpoint(pretrained_filename)
    else:
        pl.seed_everything(42)  # To be reproducable
        model = net
        trainer.fit(model, train_loader, val_loader)
        model = net.load_from_checkpoint(
            trainer.checkpoint_callback.best_model_path
        )  # Load best checkpoint after training

    # Test best model on validation and test set
    val_result = trainer.test(model, dataloaders=val_loader, verbose=False)
    test_result = trainer.test(model, dataloaders=test_loader, verbose=False)
    result = {"test": test_result[0]["test_acc"], "val": val_result[0]["test_acc"]}

    return model, result

In [6]:
model,result = train_model(net,"modello1",CHECKPOINT_PATH = "../checkpoint/")



ValueError: Expected a parent