In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Check Pytorch installation
import torch, torchvision
print(torch.__version__, torch.cuda.is_available())

2.3.0+cu121 True


In [None]:
# Required imports definition (put all imports you need in this cell)
import os
import numpy as np
from zipfile import ZipFile
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import scipy.io
from scipy import ndimage
from skimage import exposure, filters, morphology
from scipy.ndimage import gaussian_filter, median_filter
from skimage.morphology import square
import skimage.io as io
import cv2
import shutil
import numpy as np

In [None]:
# Include the name of your group
group_name = 'gruppo_FB14'

In [None]:
#DEFINITION OF PATHS
submission_folder ='...'
print(f'submission folder: {submission_folder}')

checkpoint_path = os.path.join(submission_folder, 'final_checkpoint.pth')

test_img_folder = '...'

results_folder = os.path.join(submission_folder, f'results_{group_name}')

if not os.path.exists(results_folder):
    os.makedirs(results_folder)

submission folder: /content/drive/MyDrive/submission


In [None]:
import torch
import torch.nn as nn

class UNet(nn.Module):
    def __init__(self, input_channels=3, out_classes=1):
        super(UNet, self).__init__()

        # Contracting path
        self.enc1 = self.conv_block(input_channels,32)
        self.enc2 = self.conv_block(32, 64)
        self.enc3 = self.conv_block(64, 128)
        self.enc4 = self.conv_block(128, 256)

        self.pool = nn.MaxPool2d(kernel_size=2)

        # Lowest resolution
        self.bottleneck = self.conv_block(256, 512)
        # Expansive path
        self.upconv4=nn.ConvTranspose2d(512,256, kernel_size=2, stride=2)
        self.dec4 = self.conv_block(512, 256)
        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec3 = self.conv_block(384, 128)
        self.upconv2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec2 = self.conv_block(256, 64)
        self.upconv1 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.dec1 = self.conv_block(160, 32)

        self.out = nn.Conv2d(32, out_classes, kernel_size=1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),

            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )


    def forward(self, x):
        # Contracting path
        x1 = self.enc1(x)

        x2 = self.enc2(self.pool(x1))

        x3 = self.enc3(self.pool(x2))

        x4 = self.enc4(self.pool(x3))

        # Lowest resolution
        x5 = self.bottleneck(self.pool(x4))
        x5=self.upconv4(x5)
        # Expansive path, repeat upconv and concatenation as needed
        x = torch.cat([x4, x5], dim=1)
        x = self.dec4(x)
        x = self.upconv3(x)

        x21=self.upconv3(x4)
        x = torch.cat([x3, x, x21], dim=1)
        x = self.dec3(x)
        x = self.upconv2(x)

        x11=self.upconv2(x3)
        x12=self.upconv2(x21)
        x = torch.cat([x2, x, x11,x12], dim=1)
        x = self.dec2(x)
        x = self.upconv1(x)

        x01=self.upconv1(x2)
        x02=self.upconv1(x11)
        x03=self.upconv1(x12)
        x = torch.cat([x1, x, x01,x02,x03], dim=1)
        x = self.dec1(x)

        x = self.out(x)
        return x

# Example usage for batch processing
H = 128
W = 128
C = 3
batch_size = 4  # Example batch size
X_batch = torch.randn((batch_size, C, H, W))
model=UNet()
output_batch = model(X_batch)

# Print the shape of the output to check dimensions
print("Output batch shape:", output_batch.shape)


In [None]:
# Load the config file and print to see if it's correct
model = UNet()
model.load_state_dict(torch.load(checkpoint_path)['model_state_dict'])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
def kernel_gauss(size, sigma):
  v = np.linspace(-(size-1)/2,(size-1)/2,size)
  x, y = np.meshgrid(v,v)
  h = np.exp(-((x**2 + y**2)/(2.0*sigma**2)))
  h = h/h.sum()
  return h

In [None]:
def pre_processing(image):
  image = cv2.resize(image, (1200,800), interpolation=cv2.INTER_LINEAR)
  image=image.astype(np.float32)/65535

  image = ndimage.median_filter(image,size=(5,5))
  h = kernel_gauss(3,1)

  image=ndimage.correlate(image,h)
  image=exposure.equalize_adapthist(image)

  image = cv2.resize(image, (512,512), interpolation=cv2.INTER_LINEAR)

  image = np.expand_dims(image, axis=0)
  image = np.expand_dims(image, axis=0)
  image=np.repeat(image, repeats=3, axis=1)

  image = torch.from_numpy(image)

  return image

In [None]:
def  inference_model(model,device,image):
  model.eval()
  threshold_value=0.5
  with torch.no_grad():
     image=image.to(device)
     output = model(image)
     predicted_mask = (output.cpu().squeeze() > threshold_value).float()
  return predicted_mask

In [None]:
from scipy.ndimage import binary_opening
from skimage.morphology import disk

# Creazione di un elemento strutturale di forma circolare (disco) con raggio 5
disk_radius = 5
structuring_element = disk(disk_radius)

In [None]:
for volume in os.listdir(test_img_folder):

  if volume.endswith('.mat'):
    mat_data = scipy.io.loadmat(os.path.join(test_img_folder,volume))
    [nome_volume,tail]=os.path.splitext(volume)

    # Access the images from the loaded data
    images = mat_data['V']
    [A,B,C]=images.shape
    predicted_volume= np.zeros([C,A,B])

    # Loop over the images
    for i in tqdm(range(0,698)):
          image = images[:,:,i]
          ###############################
          ####### Pre-Processing ########
          ###############################

          image_preproc = pre_processing(image)

          ###############################
          #######    Inference   ########
          ###############################

          pred_label = inference_model(model,device,image_preproc)
          pred_label_np = pred_label.cpu().numpy()
          pred_label_np= cv2.resize(pred_label_np, (B,A), interpolation=cv2.INTER_LINEAR)

          ###############################
          ####### Post-Processing #######
          ###############################
          pred_label_np= binary_opening(pred_label_np, structure=structuring_element)

          # Convert the NumPy array to a PIL image

          pred_label_pil = Image.fromarray((pred_label_np * 255).astype(np.uint8))

          ###############################
          #######   Save results  #######
          ###############################
          predicted_volume[i,:,:] = (pred_label_np * 255).astype(np.uint8)

          img_name=nome_volume+'_'+str(i+1)+'.png'

          if not os.path.exists(os.path.join(results_folder, nome_volume)):
            os.makedirs(os.path.join(results_folder, nome_volume))

          pred_label_pil.save(os.path.join(results_folder, nome_volume,img_name))

100%|██████████| 698/698 [08:16<00:00,  1.41it/s]
