In [1]:
import os
import json
import nibabel as nib
import numpy as np
from tqdm.notebook import tqdm
import torch
from torch import nn
import random
import torchvision.transforms as T
!pip install torchmetrics
from torchmetrics import Dice

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/data/data

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/data/data


In [3]:
torch.manual_seed(0)
random.seed(0)
np.random.seed(0)

In [4]:
class Dataset:
    
    def __init__(self, transform_x=None):
        self.X = torch.Tensor([])
        self.Y = torch.Tensor([])
        self.transform_x = transform_x

        path = ''
        path_images = os.path.join(path, 'images')
        path_labels = os.path.join(path, 'labels')
        with open('training_data.json', 'r') as f:
            dict_training = json.load(f)

        for entry in tqdm(dict_training):
            image = nib.load(os.path.join(path_images, entry['image'][:-3]))
            label = nib.load(os.path.join(path_labels, entry['label'][:-3]))
            tmp = image.get_fdata().transpose((2, 0, 1))
            tmp2 = label.get_fdata().transpose((2, 0, 1))
            self.X = torch.cat((self.X, torch.Tensor(tmp)), dim=0)
            self.Y = torch.cat((self.Y,  torch.Tensor(tmp2)), dim=0)

        self.X = self.X[:,None,::]
        self.Y = self.Y[:,None,::]

    def __len__(self):
        return len(self.X)  
    
    def __getitem__(self, idx):
        if self.transform_x is None:
          return self.X[idx], self.Y[idx]
        return self.transform_x(self.X[idx]), self.Y[idx]

class FinalPrediction:
    def __init__(self):
        self.X_test = torch.Tensor([])
        self.data = []
        self.y = []
        path = ''
        path_images = os.path.join(path, 'images')
        path_labels = os.path.join(path, 'labels')
        with open('testing_data.json', 'r') as f:
            dict_testing = json.load(f)

        images = []
        self.n_channels = []
        for entry in tqdm(dict_testing):
            image = nib.load(os.path.join(path_images, entry['image'][:-3]))
            tmp = image.get_fdata().transpose((2, 0, 1))
            self.data.append( torch.Tensor(tmp)[:,None,::])


    def __len__(self):
        return len(self.X_test)  
    
    def __getitem__(self, idx):
        return self.X_test[idx], None

    def predict(self, model, transform):
      result = []
      model.eval()
      with torch.no_grad():
        for d in self.data:
          d = transform(d.to(device))
          pred = model(d).round()    
          result.append(pred.cpu().numpy().squeeze().transpose((1, 2, 0)))
      return result

In [5]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [6]:
def dice_coef(y_true, y_pred, smooth=1):
  intersection = torch.sum(y_true * y_pred)
  union = torch.sum(y_true) + torch.sum(y_pred)
  dice = torch.mean((2. * intersection + smooth)/(union + smooth), axis=0)
  return dice

class DiceLoss(nn.Module):
  def __init__(self):
      super().__init__()

  def forward(self, y_pred, y_true):
    return 1 - dice_coef(y_pred, y_true)


In [7]:
class Segmentator(nn.Module):
  def Block(self, inp, out):
    return nn.Sequential(
        nn.BatchNorm2d(inp),
        nn.Conv2d(inp, out, kernel_size=(3,3), padding=1),
        nn.LeakyReLU()
  )
  def __init__(self):
    super().__init__()
    self.nn = nn.Sequential(
        self.Block(1, 64),
        nn.MaxPool2d((2, 2), (2, 2)),
        self.Block(64, 128),
        nn.MaxPool2d((2, 2), (2, 2)),
        self.Block(128, 256),
        nn.MaxPool2d((2, 2), (2, 2)),
        self.Block(256, 512),
        nn.MaxPool2d((2, 2), (2, 2)),
        self.Block(512, 1024),
        self.Block(1024, 512),
        nn.Upsample(scale_factor=2),
        self.Block(512, 256),
        nn.Upsample(scale_factor=2),
        self.Block(256, 128),
        nn.Upsample(scale_factor=2),
        self.Block(128, 64),
        nn.Upsample(scale_factor=2),
        self.Block(64, 1),
        nn.Sigmoid()

    )


  def forward(self, x):
    return self.nn(x)
    

In [8]:
batch_size = 18
d = Dataset()
train_size = int(1 * len(d))
test_size = len(d) - train_size
train, test = torch.utils.data.random_split(d, [train_size, test_size],generator=torch.Generator().manual_seed(42))
dataloader_train = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)

mu = 0.
sigma = 0.
for x, y in dataloader_train:
    mu += x.mean()
    sigma += x.std()

mu /= len(dataloader_train.dataset)
sigma /= len(dataloader_train.dataset)
standardize = T.Normalize(mean=mu, std=sigma)
transform_x = T.Compose([standardize])

d = Dataset(transform_x)
train, test = torch.utils.data.random_split(d, [train_size, test_size], generator=torch.Generator().manual_seed(42))

dataloader_train = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True)

  0%|          | 0/40 [00:00<?, ?it/s]

  0%|          | 0/40 [00:00<?, ?it/s]

In [None]:
model = torch.hub.load('mateuszbuda/brain-segmentation-pytorch', 'unet',
    in_channels=1, out_channels=1, init_features=32, pretrained=False).to(device) #we can use unet or our custom segmentator
#model.load_state_dict(torch.load('model-final2'))

epochs = 200
lr=1e-3
optim = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-3)
criterion = DiceLoss()

for i in range(epochs):
  l = []
  for x, y in dataloader_train:
    x = x.to(device)
    y = y.to(device)
    if x.shape[0]==batch_size:
      optim.zero_grad()
      model.train()
      pred = model(x)
      loss = criterion(pred, y)
      l.append(loss.mean().item())
      loss.backward()
      torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
      optim.step()
  print('Train : dice loss: ',np.array(l).mean())
  torch.save(model.state_dict(), 'model-final2')

Using cache found in /root/.cache/torch/hub/mateuszbuda_brain-segmentation-pytorch_master


Train : dice loss:  0.9904671462027581
Train : dice loss:  0.9792845766622942
Train : dice loss:  0.8917394142884475
Train : dice loss:  0.66346281177395
Train : dice loss:  0.5579704563696306


In [None]:
def rle_encoding(x):
    dots = np.where(x.T.flatten() == 1)[0]
    #print(x.shape, x.T.shape)
    run_lengths = []
    prev = -2
    for b in dots:
        if (b > prev + 1):
            run_lengths.extend((b + 1, 0))
        run_lengths[-1] += 1
        prev = b
    return [str(item) for item in run_lengths]


final_pred = FinalPrediction()


labels_predicted = final_pred.predict(model, transform_x)
      
with open(f'testing_data.json', 'r') as f:
            dict_testing = json.load(f)

with open(f'submissionF!_1.csv', "wt") as sb:
    submission_writer = csv.writer(sb, delimiter=',')
    submission_writer.writerow(["Id", "Predicted"])

    
    for k_i, patient_i in tqdm(zip(dict_testing, labels_predicted)):
        submission_writer.writerow([
                f"{k_i['image'][:-7]}",
                " ".join(rle_encoding(patient_i))
            ])

Using cache found in /root/.cache/torch/hub/mateuszbuda_brain-segmentation-pytorch_master


  0%|          | 0/10 [00:00<?, ?it/s]

0it [00:00, ?it/s]