<a href="https://colab.research.google.com/github/sajidcsecu/radioGenomic/blob/main/NewUnetTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from NewUnetTrain import UnetTrain
import os
import torch
from torch.utils.data import Dataset, DataLoader
from NewUnetDataPreparation import SliceDataset
from NewUnet import UNet
import tqdm
import time
from operator import add
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score
import torchinfo
from torchinfo import summary
import cv2



class UnetTest:

    def calculate_metrics(self, y_true, y_pred):
        """ Ground truth """
        y_true = y_true.cpu().numpy()
        y_true = y_true > 0.5
        y_true = y_true.astype(np.uint8)
        y_true = y_true.reshape(-1)

        """ Prediction """
        y_pred = y_pred.cpu().numpy()
        y_pred = y_pred > 0.5
        y_pred = y_pred.astype(np.uint8)
        y_pred = y_pred.reshape(-1)

        score_jaccard = jaccard_score(y_true, y_pred)
        score_f1 = f1_score(y_true, y_pred)
        score_recall = recall_score(y_true, y_pred)
        score_precision = precision_score(y_true, y_pred)
        score_acc = accuracy_score(y_true, y_pred)

        return [score_jaccard, score_f1, score_recall, score_precision, score_acc]


    def save_result(self,image,org_mask,predicted_mask,id):
        predicted_mask = predicted_mask.cpu().numpy()  ## (1, 512, 512)
        predicted_mask = np.squeeze(predicted_mask, axis=0)  ## (512, 512)
        predicted_mask = predicted_mask > 0.5
        predicted_mask = np.array(predicted_mask, dtype=np.float32) * 255

        org_mask = org_mask.cpu().numpy()
        org_mask = np.squeeze(org_mask, axis=0) *255

        image = image.cpu().numpy()
        image = np.squeeze(image, axis=0) * 255


        # print("Shape for predicted Mask : ", y_pred.shape)
        # print("Shape for predicted Mask : ", y_pred)

        # print("Shape for predicted Mask : ", y_pred)
        line = np.ones((512, 10)) * 128
        #
        cat_images = np.concatenate(
            [image, line, org_mask, line, predicted_mask], axis=1
        )
        cv2.imwrite(f"F:\\Idiot Developer\\radioGenomic\\Segementation\\data\\results\\{id}.png", cat_images)


    def test(self, model, loader, device):
        metrics_score = [0.0, 0.0, 0.0, 0.0, 0.0]
        time_taken = []
        model.eval()
        with torch.no_grad():
            for x, y in loader:
                x = x.to(device, dtype=torch.float32)
                y = y.to(device, dtype=torch.float32)
                start_time = time.time()
                y_pred = model(x)
                y_pred = torch.sigmoid(y_pred)
                total_time = time.time() - start_time
                time_taken.append(total_time)
                score = self.calculate_metrics(y, y_pred)
                metrics_score = list(map(add, metrics_score, score))
                for idx in range(x.size(dim=0)):
                    self.save_result(x[idx,:,:,:],y[idx,:,:,:],y_pred[idx,:,:,:],idx)

                # print("Devices for Image : ",x.get_device())
                # print("Devices for Mask : ", y.get_device())
                # print("Devices for predicted Mask : ", y_pred.get_device())
                # print("Shape for Image : ",x.dtype)
                # print("Shape for Mask : ", y.dtype)
                # print("Shape for predicted Mask : ", y_pred[0])
        # print("Matrics Score : ",metrics_score)
        # print("Times : ", time_taken)
        return metrics_score,time_taken





if __name__ == "__main__":
    utrain = UnetTrain()
    """ Seeding """
    utrain.seeding(42)

    """ paths """
    results_path = "F:\\Idiot Developer\\radioGenomic\\Segementation\\data\\results"
    path = "F:\\Idiot Developer\\radioGenomic\\Segementation"
    data_path = os.path.join(path, "data", "full data")
    checkpoint_path = os.path.join(path, "files", "checkpoint.pth")

    """ Load dataset """
    batch_size = 2
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = utrain.load_paths(data_path)
    train_dataset = SliceDataset(train_x, train_y)
    valid_dataset = SliceDataset(valid_x, valid_y)
    test_dataset = SliceDataset(test_x, test_y)
    """ Train data Loader"""
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=os.cpu_count()
    )
    """ Train data Loader"""
    valid_loader = DataLoader(
        dataset=valid_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=os.cpu_count()
    )

    """ Test data Loader"""
    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=os.cpu_count()
    )

    # Make device agnostic code
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = UNet(1, 1)
    model = model.to(device)
    model.load_state_dict(torch.load(checkpoint_path, map_location=device))
    # summary(model,input_size=[1, 1, 256, 256])
    utest = UnetTest()
    utest.test(model,test_loader,device)

    #
    #     """ Saving masks """
    #     ori_mask = mask_parse(mask)
    #     pred_y = mask_parse(pred_y)
    #     line = np.ones((size[1], 10, 3)) * 128
    #
    #     cat_images = np.concatenate(
    #         [image, line, ori_mask, line, pred_y * 255], axis=1
    #     )
    #     cv2.imwrite(f"results/{name}.png", cat_images)
    #
    # jaccard = metrics_score[0]/len(test_x)
    # f1 = metrics_score[1]/len(test_x)
    # recall = metrics_score[2]/len(test_x)
    # precision = metrics_score[3]/len(test_x)
    # acc = metrics_score[4]/len(test_x)
    # print(f"Jaccard: {jaccard:1.4f} - F1: {f1:1.4f} - Recall: {recall:1.4f} - Precision: {precision:1.4f} - Acc: {acc:1.4f}")
    #
    # fps = 1/np.mean(time_taken)
    # print("FPS: ", fps)