In [1]:
import numpy as np
import matplotlib.pyplot as plt

import random
import albumentations as A
from albumentations.pytorch import ToTensorV2

import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim


In [None]:
def test(model, dataloaders, loss_fn, acc_fn, random_state=49, epochs=1):
    
    
    torch.manual_seed(random_state)
    np.random.seed(random_state)
    torch.backends.cudnn.benchmark = False
    torch.use_deterministic_algorithms(True)
    start = time.time()                                        #Initialize time to calculate time it takes to train model
    model.to(device)                                               #Move model to GPU     

    model.eval()
    
    counter=0
    test_loss = []                         #Running training and validation loss
    loss_epoch, f1_epoch = [0],[0]
    AP_epoch =[0]
    loss_list = []
    times     = []
    
    for epoch in range(epochs):
        start_epoch = time.time()
        print(f'Epoch {epoch}')
        #print(scheduler.get_last_lr())
    

 ################ MODEL TESTING  #############################

        dataloader = dataloaders['test']         #get the training data

        step = 0
        loss_test = []
        f1=[]
        AP = []
        with torch.no_grad():
            for inputs, labels in dataloader:
                x, y = inputs.to(device), labels.to(device)
                #optimizer.zero_grad()                                   # zero the gradients
                outputs = model(x)                                      #get model output for a given input

                #################Metrics###################
                f1.append(acc_fn(outputs, y).cpu().detach().numpy())
                AP.append(average_precision_score(y.reshape(-1).cpu().detach().numpy(),  outputs.reshape(-1).cpu().detach().numpy()))
                loss_test.append(loss_fn(outputs, y).cpu().detach().numpy())
            ############################################
                print(f'Current step: {step}, AllocMem (Mb): {torch.cuda.memory_allocated()/1024/1024:.3f},  F1: {np.mean(f1):.3f},  AP: {np.mean(AP):.3f}') 

                step+=1
                ##################Calculate Loss, backprop, and update###############
               
                test_loss.append(loss_test[-1])
        loss_epoch.append(np.mean(loss_test))
        f1_epoch.append(np.mean(f1))
        AP_epoch.append(np.mean(AP))
        print()
        print()
        print(f' Loss test: {loss_epoch[-1]:.3f}, F-Score test:{f1_epoch[-1]:.3f}, AP val:{AP_epoch[-1]:.3f} \n') 
        ######################################################################

        epoch+=1
        
    return {
            'f1_epoch':f1_epoch,
            'Epochs': epoch}

In [144]:
class basic_block(nn.Module):
    def __init__(self,in_channels,out_chan, random_state=0):
        super(basic_block,self).__init__()

        out_channels = out_chan//2
        
        torch.manual_seed(random_state)
        self.bn1         = nn.BatchNorm2d(in_channels)
        
        self.conv1x1_1_1 = nn.Conv2d(in_channels, out_channels, 1)
        self.bn2         = nn.BatchNorm2d(out_channels)
        
        self.conv1x1_1_3 = nn.Conv2d(in_channels, out_channels, 1)
        self.bn3         = nn.BatchNorm2d(out_channels)
        
        self.conv3x3_1   = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.bn4         = nn.BatchNorm2d(out_channels)
        
        self.conv1x1_1_5 = nn.Conv2d(in_channels, out_channels, 1)
        self.bn5         = nn.BatchNorm2d(out_channels)
        
        self.conv5x5_1   = nn.Conv2d(out_channels,out_channels,5,padding=2)
        self.bn6         = nn.BatchNorm2d(out_channels)
      
        self.conv1x1_1_7 = nn.Conv2d(in_channels, out_channels, 1)
        self.bn7         = nn.BatchNorm2d(out_channels)
        
        self.conv7x7_1   = nn.Conv2d(out_channels,out_channels,7,padding=3)
        self.bn8         = nn.BatchNorm2d(out_channels)
        
        self.bn9         = nn.BatchNorm2d(out_channels*4)
        self.conv1x1_2_1 = nn.Conv2d(out_channels*4, out_channels, 1)
        self.bn10        = nn.BatchNorm2d(out_channels)
        self.conv1x1_2_3 = nn.Conv2d(out_channels*4, out_channels, 1)
        self.bn11        = nn.BatchNorm2d(out_channels)
        self.conv3x3_2   = nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.bn12        = nn.BatchNorm2d(out_channels)
        self.conv1x1_2_5 = nn.Conv2d(out_channels*4, out_channels, 1)
        self.bn13        = nn.BatchNorm2d(out_channels)
        self.conv5x5_2   = nn.Conv2d(out_channels,out_channels,5,padding=2)
        self.bn14        = nn.BatchNorm2d(out_channels)
        self.conv1x1_2_7 = nn.Conv2d(out_channels*4, out_channels, 1)
        self.bn15        = nn.BatchNorm2d(out_channels)
        self.conv7x7_2   = nn.Conv2d(out_channels,out_channels,7,padding=3)
        self.bn16        = nn.BatchNorm2d(out_channels)
       
    def forward(self, x):
       
        bn1         = self.bn1(x)
        conv1x1_1_3 = self.bn3(self.conv1x1_1_3(bn1))
        conv1x1_1_5 = self.bn4(self.conv1x1_1_5(bn1))
        conv1x1_1_7 = self.bn5(self.conv1x1_1_7(bn1))
        conv1x1_1_1 = F.relu(self.bn2(self.conv1x1_1_1(bn1)))
        conv3x3_1   = F.relu(self.bn6(self.conv3x3_1(conv1x1_1_3)))
        conv5x5_1   = F.relu(self.bn7(self.conv5x5_1(conv1x1_1_5)))
        conv7x7_1   = F.relu(self.bn8(self.conv7x7_1(conv1x1_1_7)))
        cat1        = torch.cat([conv1x1_1_1,conv3x3_1,conv5x5_1,conv7x7_1],dim=1)
       
        bn9         = self.bn9(cat1)
        conv1x1_2_3 = self.bn11(self.conv1x1_2_3(bn9))
        conv1x1_2_5 = self.bn12(self.conv1x1_2_5(bn9))
        conv1x1_2_7 = self.bn13(self.conv1x1_2_7(bn9))
        conv1x1_2_1 = F.relu(self.bn10(self.conv1x1_2_1(bn9)))
        conv3x3_2   = F.relu(self.bn14(self.conv3x3_2(conv1x1_2_3)))
        conv5x5_2   = F.relu(self.bn15(self.conv5x5_2(conv1x1_2_5)))
        conv7x7_2   = F.relu(self.bn16(self.conv7x7_2(conv1x1_2_7)))
        cat2        = torch.cat([conv1x1_2_1,conv3x3_2,conv5x5_2,conv7x7_2],dim=1)

 
        return cat2
       

class UNET_multiscale2(nn.Module):
    def __init__(self, in_channels=3, out_channels= 1, init_features=32, random_state=0):
        super(UNET_multiscale2, self).__nit__()
        torch.manual_seed(random_state)
        features = init_features
        self.layer1 =  basic_block(in_channels,features)
        self.down1  = nn.Conv2d(features*2,features,2,stride=2)
       
        self.layer2 = basic_block(features,features) 
        self.down2  = nn.Conv2d(features*2,features*2,2,stride=2)

        self.layer3 = basic_block(features*2,features*2) 
        self.down3  = nn.Conv2d(features*4,features*4,2,stride=2)

        self.layer4 = basic_block(features*4,features*4)
        self.down4  = nn.Conv2d(features*8,features*8,2,stride=2)
       
        self.bottleneck = basic_block(features*8,features*8)
        self.bn6     = nn.BatchNorm2d(features*8*2)
        self.up1     = nn.ConvTranspose2d(features*16, features*8, 2, stride=2)
             
        self.layer6  = basic_block(features*16,features*4)      
        self.bn7     = nn.BatchNorm2d(features*4*2)
        self.up2     = nn.ConvTranspose2d(features*8, features*4, 2, stride=2)

        self.layer7  = basic_block(features*8,features*2) 
        self.bn8     = nn.BatchNorm2d(features*2*2)
        self.up3     = nn.ConvTranspose2d(features*4, features*2, 2, stride=2)   
       
        self.layer8  = basic_block(features*4,features) 
        self.bn9     = nn.BatchNorm2d(features*2)
        self.up4     = nn.ConvTranspose2d(features*2, features*2, 2, stride=2)
       
        self.layer9  = basic_block(features*4,features)
        self.out     = nn.Conv2d(features*2, 1, 1)
       
    def forward(self, x):
    
        layer1 = self.layer1(x)
        down1  = F.relu(self.down1(layer1))

        layer2 = self.layer2(down1) 
        down2  = F.relu(self.down2(layer2))
       
        layer3 = self.layer3(down2) 
        down3  = F.relu(self.down3(layer3))
       
        layer4 = self.layer4(down3) 
        down4  = F.relu(self.down4(layer4))
       
        bottleneck = self.bottleneck(down4)
        up1     = F.relu(self.up1(self.bn6(bottleneck), output_size=layer4.size()))

        merge1  = torch.cat([up1, layer4], dim=1)      
        layer6  = self.layer6(merge1)
        up2        = F.relu(self.up2(self.bn7(layer6), output_size=layer3.size()))

        merge2     = torch.cat([up2, layer3], dim=1)
        layer7     = self.layer7(merge2)
        up3        = F.relu(self.up3(self.bn8(layer7), output_size=layer2.size()))
        merge3     = torch.cat([up3, layer2], dim=1)
        layer8  = self.layer8(merge3)
        up4        = F.relu(self.up4(self.bn9(layer8), output_size=layer1.size()))
       
        merge4     = torch.cat([up4, layer1], dim=1)
        layer9  = self.layer9(merge4)                    
        out        = torch.sigmoid(self.out(layer9))
        return out

In [145]:
model = torch.load("best_epoch.pth", map_location=torch.device('cpu'))