In [40]:
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import cv2
import pandas as pd
import torch
from torchvision.transforms import functional as T
import torch.nn.functional as F
import torch.nn as nn
from torchvision.transforms import Resize, PILToTensor, ToPILImage, Compose, InterpolationMode
from torchgeometry.losses import one_hot
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import os
from PIL import Image
import random

In [41]:
torch.backends.cudnn.benchmark = True
import wandb

In [42]:
# Check compute device
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [None]:
epochs = 10
train_split = 0.8

In [44]:
class RandomGamma:
    def __init__(self, gamma_range=(0.7, 1.3), p=0.2):
        self.gamma_range = gamma_range
        self.p = p

    def __call__(self, image):
        if random.random() < self.p:
            gamma = random.uniform(*self.gamma_range)
            image = transforms.functional.adjust_gamma(image, gamma)
        return image

In [45]:
class CustomDataset(Dataset):
    def __init__(self, images_path, masks_path, transform):
        super(CustomDataset, self).__init__()
        self.images_list = [os.path.join(images_path, img) for img in sorted(os.listdir(images_path))]
        self.masks_list = [os.path.join(masks_path, mask) for mask in sorted(os.listdir(masks_path))]
        self.transform = transform

    def __getitem__(self, index):
        img_path = self.images_list[index]
        mask_path = self.masks_list[index]

        # Open image and mask
        data = Image.open(img_path).convert("RGB")  # RGB image
        label = Image.open(mask_path).convert("L")  # Single-channel grayscale

        # Apply transformations (includes resizing)
        if self.transform:
            data = self.transform(data)
            label = self.transform(label)  # Transform applied to mask

        # Threshold the mask to binary (or multi-class) and convert to long tensor
        label = label.squeeze(0)  # Remove channel dimension if present
        label = (label > 0.65).long()

        return data, label


    def __len__(self):
        return len(self.images_list)


In [46]:
transforms_pipeline = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize all images and masks to 224x224
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    RandomGamma(gamma_range=(0.7, 1.3), p=0.2),
    transforms.ToTensor()  # Convert to tensor
])

dataset = CustomDataset(
    r'C:\Users\dmin\HUST\20241\DeepLearning\Segmentation\bkai-igh-neopolyp\train\train',
    r'C:\Users\dmin\HUST\20241\DeepLearning\Segmentation\bkai-igh-neopolyp\train_gt\train_gt',
    transforms_pipeline
)

# Split dataset into training and validation sets
train_split = 0.8  # Adjust the split ratio as needed
train_size = int(train_split * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# DataLoaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True)

# Verify the dataset and loaders
for data, label in train_loader:
    print(f"Image batch shape: {data.shape}")  # Should be (B, C, H, W)
    print(f"Mask batch shape: {label.shape}")  # Should be (B, H, W)
    break



Image batch shape: torch.Size([4, 3, 224, 224])
Mask batch shape: torch.Size([4, 224, 224])


In [47]:
import segmentation_models_pytorch as smp

model = smp.UnetPlusPlus(
    encoder_name="efficientnet-b7",
    encoder_weights="imagenet",
    in_channels=3,
    classes=3
)
model.to(device)
#print(model)


UnetPlusPlus(
  (encoder): EfficientNetEncoder(
    (_conv_stem): Conv2dStaticSamePadding(
      3, 64, kernel_size=(3, 3), stride=(2, 2), bias=False
      (static_padding): ZeroPad2d((0, 1, 0, 1))
    )
    (_bn0): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
    (_blocks): ModuleList(
      (0): MBConvBlock(
        (_depthwise_conv): Conv2dStaticSamePadding(
          64, 64, kernel_size=(3, 3), stride=[1, 1], groups=64, bias=False
          (static_padding): ZeroPad2d((1, 1, 1, 1))
        )
        (_bn1): BatchNorm2d(64, eps=0.001, momentum=0.010000000000000009, affine=True, track_running_stats=True)
        (_se_reduce): Conv2dStaticSamePadding(
          64, 16, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_se_expand): Conv2dStaticSamePadding(
          16, 64, kernel_size=(1, 1), stride=(1, 1)
          (static_padding): Identity()
        )
        (_project_conv): Conv2dStaticS

In [48]:
class DiceLoss(nn.Module):
    def __init__(self, weights):
        super(DiceLoss, self).__init__()
        self.eps: float = 1e-6
        self.weights: torch.Tensor = weights
    def forward(self, input: torch.Tensor, target: torch.Tensor):
        # cross entropy loss
        celoss = nn.CrossEntropyLoss(self.weights)(input, target)
        
        # compute softmax over the classes axis
        input_soft = F.softmax(input, dim=1)

        # create the labels one hot tensor
        target_one_hot = one_hot(target, num_classes=input.shape[1],
                                 device=input.device, dtype=input.dtype)

        # compute the actual dice score
        dims = (2, 3)
        intersection = torch.sum(input_soft * target_one_hot, dims)
        cardinality = torch.sum(input_soft + target_one_hot, dims)

        dice_score = 2. * intersection / (cardinality + self.eps)
        
        dice_score = torch.sum(dice_score * self.weights, dim=1)
        
        return torch.mean(1. - dice_score) + celoss


In [49]:
weights = torch.Tensor([[0.4, 0.55, 0.05]]).cuda()
criterion = DiceLoss(weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)
train_loss_array = []
test_loss_array = []
best_val_loss = 9999999
wandb.login(
    key = "957a50802d92e6812ca0422419a9d2fb3ebdd174",
)
wandb.init(
    project = "BKAI_segmentation_graph"
)

for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    for images, masks in train_loader:  # images: (N, C, H, W), masks: (N, 1, H, W)
        images, masks = images.to(device), masks.to(device)
        outputs = model(images)
        # Remove the extra channel from the masks
        loss = criterion(outputs, masks.squeeze(1).long())  # Squeeze to shape (N, H, W)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    
    model.eval()
    test_loss = 0.0
    correct = 0
    total_samples = 0
    with torch.no_grad():
        for i, (data, targets) in enumerate(val_loader):
            data, targets = data.to(device), targets.to(device)
            outputs = model(data)
            _, pred = torch.max(outputs, 1)
            
            loss = criterion(outputs, targets.long())
            test_loss += loss.item()
    if test_loss < best_val_loss:
        best_val_loss = test_loss
        checkpoint = { 
            'epoch': epoch,
            'model': model.state_dict(),
            'optimizer': optimizer.state_dict(),
            'train_loss':train_loss,
            'val_loss': test_loss,
        }
        save_path = f'model.pth'
        torch.save(checkpoint, save_path)
    train_loss_array.append(train_loss/len(train_loader))
    test_loss_array.append(test_loss/len(val_loader))

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss/len(train_loader):.4f}, Validation Loss: {test_loss/len(val_loader):.4f}")
    wandb.log({
        "Epoch": epoch + 1,
        "Train Loss": train_loss / len(train_loader),
        "Validation Loss": test_loss / len(val_loader),
    })



Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4, 3, 224, 224])
Mask shape: torch.Size([4, 224, 224])
Output shape: torch.Size([4,

In [None]:
for i, (data, label) in enumerate(val_loader):
     img = data
     mask = label
     break

In [None]:
fig, arr = plt.subplots(4, 3, figsize=(16, 12))
arr[0][0].set_title('Image')
arr[0][1].set_title('Segmentation')
arr[0][2].set_title('Predict')

model.eval()
with torch.no_grad():
     predict = model(img.to(device))

for i in range(4):

     arr[i][0].imshow((img*255)[i].cpu().numpy().transpose(1, 2, 0))
    
     arr[i][1].imshow(F.one_hot(mask[i]).float())
    
     arr[i][2].imshow(F.one_hot(torch.argmax(predict[i], dim = 0).cpu()).float())

**Submission**

In [None]:
class TestDataset(Dataset):
    def __init__(self, images_path, transform):
        super(TestDataset, self).__init__()
        
        # Generate full paths for all images in the directory
        self.images_list = [os.path.join(images_path, img) for img in sorted(os.listdir(images_path))]
        self.transform = transform
        
    def __getitem__(self, index):
        # Load image
        img_path = self.images_list[index]
        data = Image.open(img_path).convert("RGB")  # Ensure RGB for consistency
        
        # Save original dimensions for later
        h, w = data.size
        
        # Apply transformations
        if self.transform:
            data = self.transform(data)
        
        # Normalize the image (if not done during transform)
        data = data / 255.0
        
        return data, img_path, h, w
    
    def __len__(self):
        return len(self.images_list)

In [None]:
test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to a fixed size
    transforms.ToTensor()  # Convert to PyTorch tensor
])



test_images_path = r'C:\Users\dmin\HUST\20241\DeepLearning\Segmentation\bkai-igh-neopolyp\test'

# Initialize dataset and dataloader
test_dataset = TestDataset(test_images_path, test_transforms)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Verify the dataset
for data, img_path, h, w in test_loader:
    print(f"Batch data shape: {data.shape}")
    print(f"Paths: {img_path}")
    print(f"Original dimensions: {list(zip(h, w))}")
    break


In [None]:
for i, (data, path, h, w) in enumerate(test_loader):
    img = data
    break

In [None]:
fig, arr = plt.subplots(4, 2, figsize=(16, 12))
arr[0][0].set_title('Image')
arr[0][1].set_title('Predict')

model.eval()
with torch.no_grad():
    predict = model(img.to(device))

for i in range(4):
    arr[i][0].imshow((img*255)[i].cpu().numpy().transpose(1, 2, 0))
    arr[i][1].imshow(F.one_hot(torch.argmax(predict[i], 0).cpu()).float())

In [None]:
model.eval()
if not os.path.isdir("/kaggle/working/predicted_masks"):
    os.mkdir("/kaggle/working/predicted_masks")
for _, (img, path, H, W) in enumerate(test_loader):
    
    with torch.no_grad():
        predicted_mask = model(img.to(device))
    for i in range(len(a)):
        image_id = path[i].split('/')[-1].split('.')[0]
        filename = image_id + ".png"
        mask2img = Resize((H[i].item(), W[i].item()), interpolation=InterpolationMode.NEAREST)(ToPILImage()(F.one_hot(torch.argmax(predicted_mask[i], 0)).permute(2, 0, 1).float()))
        mask2img.save(os.path.join("/kaggle/working/predicted_masks/", filename))

In [None]:
def rle_to_string(runs):
    return ' '.join(str(x) for x in runs)

def rle_encode_one_mask(mask):
    pixels = mask.flatten()
    pixels[pixels > 0] = 255
    use_padding = False
    if pixels[0] or pixels[-1]:
        use_padding = True
        pixel_padded = np.zeros([len(pixels) + 2], dtype=pixels.dtype)
        pixel_padded[1:-1] = pixels
        pixels = pixel_padded
    
    rle = np.where(pixels[1:] != pixels[:-1])[0] + 2
    if use_padding:
        rle = rle - 1
    rle[1::2] = rle[1::2] - rle[:-1:2]
    return rle_to_string(rle)

def mask2string(dir):
    strings = []
    ids = []
    ws, hs = [[] for i in range(2)]
    for image_id in os.listdir(dir):
        id = image_id.split('.')[0]
        path = os.path.join(dir, image_id)
        print(path)
        img = cv2.imread(path)[:,:,::-1]
        h, w = img.shape[0], img.shape[1]
        for channel in range(2):
            ws.append(w)
            hs.append(h)
            ids.append(f'{id}_{channel}')
            string = rle_encode_one_mask(img[:,:,channel])
            strings.append(string)
    r = {
        'ids': ids,
        'strings': strings,
    }
    return r


MASK_DIR_PATH = '/kaggle/working/predicted_masks' # change this to the path to your output mask folder
dir = MASK_DIR_PATH
res = mask2string(dir)
df = pd.DataFrame(columns=['Id', 'Expected'])
df['Id'] = res['ids']
df['Expected'] = res['strings']
df.to_csv(r'output.csv', index=False)