In [1]:
import numpy as np
import os
from torch.utils.data import Dataset, DataLoader
import torch
from PIL import Image
import matplotlib.pyplot as plt
from albumentations.pytorch import ToTensorV2
import albumentations as A



In [2]:
import torch.nn as nn
from torch.optim import Adam
from tqdm import tqdm
import cv2
from torchvision import transforms

In [3]:
DATA_DIR = './data/color_coded'

x_train_dir = os.path.join(DATA_DIR, 'train')
y_train_dir = os.path.join(DATA_DIR, 'train_masks')

x_valid_dir = os.path.join(DATA_DIR, 'val')
y_valid_dir = os.path.join(DATA_DIR, 'val_masks')

x_test_dir = os.path.join(DATA_DIR, 'test')
y_test_dir = os.path.join(DATA_DIR, 'test_masks')

# Dataset

In [11]:
from torch.utils.data import Dataset
from torchvision.transforms.functional import to_tensor
from PIL import Image
import os

# class EggplantDataset(Dataset):
#     def __init__(self, x_dir, y_dir, transform=None):
#         self.x_dir = x_dir  # Directory containing the images
#         self.y_dir = y_dir  # Directory containing the corresponding masks
#         self.transform = transform

#         # Get the list of filenames for images and masks
#         self.image_filenames = [os.path.join(self.x_dir, filename) for filename in os.listdir(self.x_dir)]
#         self.mask_filenames = [os.path.join(self.y_dir, filename) for filename in os.listdir(self.y_dir)]

#     def __len__(self):
#         return len(self.image_filenames)

#     def __getitem__(self, index):
#         # Load the image and mask at the given index
#         image = Image.open(self.image_filenames[index])
#         mask = Image.open(self.mask_filenames[index])

#         # Apply transformations if provided
#         if self.transform is not None:
#             augmented = self.transform(image=image, mask=mask)
#             image = augmented['image']
#             mask = augmented['mask']

#         # Convert the PIL images to tensors
#         image = to_tensor(image)
#         mask = to_tensor(mask)

#         # As the mask contains multiple channels, reduce it to a single channel
#         mask = torch.max(mask, dim=0)[0]

#         return image, mask


class EggplantDataset(Dataset):
    def __init__(self, x_dir, y_dir, transform=None):
        self.x_dir = x_dir  # Directory containing the images
        self.y_dir = y_dir  # Directory containing the corresponding masks

        # Get the list of filenames for images and masks
        self.image_filenames = [os.path.join(self.x_dir, filename) for filename in os.listdir(self.x_dir)]
        self.mask_filenames = [os.path.join(self.y_dir, filename) for filename in os.listdir(self.y_dir)]

        # Define a dictionary to map color codes to class indices
        self.color_to_class = {
            (0, 0, 0): 0,    # Background
            (0, 0, 255): 1,  # Calyx
            (0, 255, 0): 2,  # Label
            (255, 0, 0): 3,  # Fruit
        }

        # Add the Albumentations transform
        self.transform = transform 

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, index):
        # Load the image and mask at the given index
        image = Image.open(self.image_filenames[index])
        mask = Image.open(self.mask_filenames[index])

        # Convert the color-coded mask to class indices
        mask = np.array(mask)
        mask_class_indices = np.zeros(mask.shape[:2], dtype=np.uint8)
        for color, class_index in self.color_to_class.items():
            class_mask = np.all(mask == color, axis=-1)
            mask_class_indices[class_mask] = class_index

        # Convert PIL images to numpy arrays
        image_np = np.array(image)
        mask_np = mask_class_indices

        # Apply the Albumentations transformation
        augmented = self.transform(image=image_np, mask=mask_np)
        image_np = augmented['image']
        mask_np = augmented['mask']

        # Convert numpy arrays to PyTorch tensors
        image = torch.tensor(image_np, dtype=torch.float32).permute(2, 0, 1)
        mask_class_indices = torch.tensor(mask_np, dtype=torch.long)

        return image, mask_class_indices


In [12]:
from torchvision import transforms

# Define transformations
transform = transforms.Compose([
    transforms.Resize((256, 512)),  # Resize the images to (256, 512)
    transforms.ToTensor(),
])

# Create train, validation, and test datasets
train_dataset = EggplantDataset(x_train_dir, y_train_dir, transform=transform)
val_dataset = EggplantDataset(x_valid_dir, y_valid_dir, transform=transform)
test_dataset = EggplantDataset(x_test_dir, y_test_dir, transform=transform)



In [13]:
# import random
# import matplotlib.pyplot as plt

# # Create an instance of the EggplantDataset
# dataset = EggplantDataset(x_train_dir, y_train_dir, transform=None)  # No transformations for visualization

# # Define the number of random samples to visualize
# num_samples_to_visualize = 5

# # Get random indices for the samples
# random_indices = random.sample(range(len(dataset)), num_samples_to_visualize)

# # Plot the random images and masks
# fig, axes = plt.subplots(num_samples_to_visualize, 2, figsize=(10, 5 * num_samples_to_visualize))

# for i, idx in enumerate(random_indices):
#     image, mask = dataset[idx]

#     # Convert the mask to a NumPy array for visualization
#     mask = mask.numpy()

#     # Plot the image
#     axes[i, 0].imshow(image.permute(1, 2, 0))  # Convert tensor back to (H, W, C) format
#     axes[i, 0].axis('off')

#     # Plot the mask
#     axes[i, 1].imshow(mask, cmap='gray')  # Use a grayscale colormap for masks
#     axes[i, 1].axis('off')

# plt.tight_layout()
# plt.show()


In [14]:
import random
import matplotlib.pyplot as plt
from PIL import Image

# Create an instance of the EggplantDataset
dataset = EggplantDataset(x_train_dir, y_train_dir, transform=None)  # No transformations for visualization

# Define the number of random samples to visualize
num_samples_to_visualize = 5

# Get random indices for the samples
random_indices = random.sample(range(len(dataset)), num_samples_to_visualize)

# Plot the random images and masks
# fig, axes = plt.subplots(num_samples_to_visualize, 2, figsize=(10, 5 * num_samples_to_visualize))

# for i, idx in enumerate(random_indices):
#     image, mask_class_indices = dataset[idx]
    
#     # Get the corresponding mask filename
#     mask_filename = dataset.mask_filenames[idx]

#     # Open the mask image as a PIL.Image object
#     mask_image = Image.open(mask_filename)

#     # Plot the image
#     axes[i, 0].imshow(image.permute(1, 2, 0))  # Convert tensor back to (H, W, C) format
#     axes[i, 0].axis('off')

#     # Plot the mask
#     axes[i, 1].imshow(mask_image)
#     axes[i, 1].set_title("Mask")
#     axes[i, 1].axis('off')

# plt.tight_layout()
# plt.show()


In [15]:
# class LyftUdacity(Dataset):
#     def __init__(self,img_dir,transform = None):
#         self.transforms = transform
#         image_paths = [i+'/CameraRGB' for i in img_dir]
#         seg_paths = [i+'/CameraSeg' for i in img_dir]
#         self.images,self.masks = [],[]
#         for i in image_paths:
#             imgs = os.listdir(i)
#             self.images.extend([i+'/'+img for img in imgs])
#         for i in seg_paths:
#             masks = os.listdir(i)
#             self.masks.extend([i+'/'+mask for mask in masks])
#     def __len__(self):
#         return len(self.images)
#     def __getitem__(self,index):
#         img = np.array(Image.open(self.images[index]))
#         mask = np.array(Image.open(self.masks[index]))
#         if self.transforms is not None:
#             aug = self.transforms(image=img,mask=mask)
#             img = aug['image']
#             mask = aug['mask']
#             mask = torch.max(mask,dim=2)[0]
#         return img,mask

In [16]:
# data_dir = ['../input/lyft-udacity-challenge/data'+i+'/data'+i for i in ['A','B','C','D','E']]

In [17]:
# def get_images(image_dir,transform = None,batch_size=1,shuffle=True,pin_memory=True):
#     data = LyftUdacity(image_dir,transform = t1)
#     train_size = int(0.8 * data.__len__())
#     test_size = data.__len__() - train_size
#     train_dataset, test_dataset = torch.utils.data.random_split(data, [train_size, test_size])
#     train_batch = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=pin_memory)
#     test_batch = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=pin_memory)
#     return train_batch,test_batch


In [18]:


def get_images(x_train_dir, y_train_dir, transform=None, batch_size=1, shuffle=True, pin_memory=True):
    # Create an instance of the EggplantDataset
    dataset = EggplantDataset(x_train_dir, y_train_dir, transform=transform)

    # Split the dataset into train and test sets
    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    # Create data loaders for train and test sets
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=pin_memory)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=pin_memory)

    return train_loader, test_loader


## Transforms

In [19]:
# t1 = A.Compose([
#     A.Resize(160,240),
#     A.augmentations.transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
#     ToTensorV2()
# ])
# 375x1000

t1 = A.Compose([
    A.Resize(512, 256, interpolation=cv2.INTER_NEAREST),  # Resize with "nearest" interpolation
    A.augmentations.transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
])


In [20]:
# x_train_dir = r".\data\color_coded\train"
# y_train_dir = r".\data\color_coded\train_masks"

train_loader, test_loader = get_images(x_train_dir, y_train_dir, transform=t1, batch_size=1)


In [21]:
# train_batch,test_batch = get_images(data_dir,transform =t1,batch_size=4)

In [22]:
import random
import matplotlib.pyplot as plt

# Create train and test data loaders
# train_loader, test_loader = get_images(x_train_dir, y_train_dir, transform=None, batch_size=1)

# # Get a batch of data from the train_loader
# for img_batch, mask_batch in train_loader:
#     # Loop through each image and mask in the batch
#     for i in range(len(img_batch)):
#         img = img_batch[i]
#         mask = mask_batch[i]

#         # Convert the image tensor to a NumPy array and permute dimensions
#         img_np = np.transpose(img.numpy(), (1, 2, 0))

#         # Convert the mask tensor to a NumPy array
#         mask_np = mask.numpy()

#         # Plot the image and mask side by side
#         fig, ax = plt.subplots(1, 2, figsize=(12, 6))
#         ax[0].imshow(img_np)
#         ax[0].set_title("Image")
#         ax[0].axis("off")
#         ax[1].imshow(mask_np)
#         ax[1].set_title("Mask")
#         ax[1].axis("off")

#         plt.show()

#     break  # Only visualize the first batch


# Architecture

In [23]:
# pip install torchsummary 

In [24]:
class encoding_block(nn.Module):
    def __init__(self,in_channels, out_channels):
        super(encoding_block,self).__init__()
        model = []
        model.append(nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False))
        model.append(nn.BatchNorm2d(out_channels))
        model.append(nn.ReLU(inplace=True))
        model.append(nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False))
        model.append(nn.BatchNorm2d(out_channels))
        model.append(nn.ReLU(inplace=True))
        self.conv = nn.Sequential(*model)
    def forward(self, x):
        return self.conv(x)    

In [25]:
class unet_model(nn.Module):
    def __init__(self,out_channels=23,features=[64, 128, 256, 512]):
        super(unet_model,self).__init__()
        self.pool = nn.MaxPool2d(kernel_size=(2,2),stride=(2,2))
        self.conv1 = encoding_block(3,features[0])
        self.conv2 = encoding_block(features[0],features[1])
        self.conv3 = encoding_block(features[1],features[2])
        self.conv4 = encoding_block(features[2],features[3])
        self.conv5 = encoding_block(features[3]*2,features[3])
        self.conv6 = encoding_block(features[3],features[2])
        self.conv7 = encoding_block(features[2],features[1])
        self.conv8 = encoding_block(features[1],features[0])        
        self.tconv1 = nn.ConvTranspose2d(features[-1]*2, features[-1], kernel_size=2, stride=2)
        self.tconv2 = nn.ConvTranspose2d(features[-1], features[-2], kernel_size=2, stride=2)
        self.tconv3 = nn.ConvTranspose2d(features[-2], features[-3], kernel_size=2, stride=2)
        self.tconv4 = nn.ConvTranspose2d(features[-3], features[-4], kernel_size=2, stride=2)        
        self.bottleneck = encoding_block(features[3],features[3]*2)
        self.final_layer = nn.Conv2d(features[0],out_channels,kernel_size=1)
    def forward(self,x):
        skip_connections = []
        x = self.conv1(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv2(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv3(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.conv4(x)
        skip_connections.append(x)
        x = self.pool(x)
        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]
        x = self.tconv1(x)
        x = torch.cat((skip_connections[0], x), dim=1)
        x = self.conv5(x)
        x = self.tconv2(x)
        x = torch.cat((skip_connections[1], x), dim=1)
        x = self.conv6(x)
        x = self.tconv3(x)
        x = torch.cat((skip_connections[2], x), dim=1)
        x = self.conv7(x)        
        x = self.tconv4(x)
        x = torch.cat((skip_connections[3], x), dim=1)
        x = self.conv8(x)
        x = self.final_layer(x)
        return x

In [26]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [27]:
model = unet_model(out_channels=4, features=[64, 128, 256, 512]).to(DEVICE)

In [28]:
from torchsummary import summary
summary(model, (3, 256, 256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 256, 256]           1,728
       BatchNorm2d-2         [-1, 64, 256, 256]             128
              ReLU-3         [-1, 64, 256, 256]               0
            Conv2d-4         [-1, 64, 256, 256]          36,864
       BatchNorm2d-5         [-1, 64, 256, 256]             128
              ReLU-6         [-1, 64, 256, 256]               0
    encoding_block-7         [-1, 64, 256, 256]               0
         MaxPool2d-8         [-1, 64, 128, 128]               0
            Conv2d-9        [-1, 128, 128, 128]          73,728
      BatchNorm2d-10        [-1, 128, 128, 128]             256
             ReLU-11        [-1, 128, 128, 128]               0
           Conv2d-12        [-1, 128, 128, 128]         147,456
      BatchNorm2d-13        [-1, 128, 128, 128]             256
             ReLU-14        [-1, 128, 1

In [29]:
LEARNING_RATE = 1e-4
num_epochs = 10

In [30]:
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
scaler = torch.cuda.amp.GradScaler()



# Training

In [31]:
# move the appropriate model to device
model.to(DEVICE)

for epoch in range(num_epochs):
    loop = tqdm(enumerate(train_loader),total=len(train_loader))
    for batch_idx, (data, targets) in loop:
        data = data.to(DEVICE)
        targets = targets.to(DEVICE)
        targets = targets.type(torch.long)

        # backward
        optimizer.zero_grad()
        
        # forward
        with torch.cuda.amp.autocast():
            predictions = model(data)
            loss = loss_fn(predictions, targets)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # update tqdm loop
        loop.set_postfix(loss=loss.item())

  image = torch.tensor(image_np, dtype=torch.float32).permute(2, 0, 1)
  mask_class_indices = torch.tensor(mask_np, dtype=torch.long)
  0%|          | 0/133 [00:02<?, ?it/s]


RuntimeError: Given groups=1, weight of size [64, 3, 3, 3], expected input[1, 256, 3, 512] to have 3 channels, but got 256 channels instead

In [None]:
from torch.cuda.amp import autocast

In [None]:
# accumulation_steps = 4 # accumulate gradients over 4 steps

# for epoch in range(num_epochs):
#     loop = tqdm(enumerate(train_loader), total=len(train_loader))
#     total_loss = 0.0

# for batch_idx, (data, targets) in loop:
#     data = data.to(DEVICE)
#     targets = targets.to(DEVICE)
#     targets = targets.type(torch.long)

#     # clear gradients from previous iteration
#     optimizer.zero_grad()

#     # Forward pass and loss calculation using autocast for mixed precision training
#     with autocast():
#         predictions = model(data)
#         loss = loss_fn(predictions, targets)

#         scaler.scale(loss).backward()

#         # Perform optimization after accumulation streps
#         if (batch_idx + 1) % accumulation_steps == 0:
#             scaler.step(optimizer)
#             scaler.update()
#             optimizer.zero_grad()

#         total_loss += loss.item()
        
# #         # update tqdm loop
#         loop.set_postfix(loss=loss.item())

# Metrics

In [None]:
def check_accuracy(loader, model):
    num_correct = 0
    num_pixels = 0
    dice_score = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(DEVICE)
            y = y.to(DEVICE)
            softmax = nn.Softmax(dim=1)
            preds = torch.argmax(softmax(model(x)),axis=1)
            num_correct += (preds == y).sum()
            num_pixels += torch.numel(preds)
            dice_score += (2 * (preds * y).sum()) / ((preds + y).sum() + 1e-8)

    print(f"Got {num_correct}/{num_pixels} with acc {num_correct/num_pixels*100:.2f}")
    print(f"Dice score: {dice_score/len(loader)}")
    model.train()

In [None]:
check_accuracy(train_batch, model)

In [None]:
check_accuracy(test_batch, model)

In [None]:
for x,y in test_batch:
    x = x.to(DEVICE)
    fig , ax =  plt.subplots(3, 3, figsize=(18, 18))
    softmax = nn.Softmax(dim=1)
    preds = torch.argmax(softmax(model(x)),axis=1).to('cpu')
    img1 = np.transpose(np.array(x[0,:,:,:].to('cpu')),(1,2,0))
    preds1 = np.array(preds[0,:,:])
    mask1 = np.array(y[0,:,:])
    img2 = np.transpose(np.array(x[1,:,:,:].to('cpu')),(1,2,0))
    preds2 = np.array(preds[1,:,:])
    mask2 = np.array(y[1,:,:])
    img3 = np.transpose(np.array(x[2,:,:,:].to('cpu')),(1,2,0))
    preds3 = np.array(preds[2,:,:])
    mask3 = np.array(y[2,:,:])
    ax[0,0].set_title('Image')
    ax[0,1].set_title('Prediction')
    ax[0,2].set_title('Mask')
    ax[1,0].set_title('Image')
    ax[1,1].set_title('Prediction')
    ax[1,2].set_title('Mask')
    ax[2,0].set_title('Image')
    ax[2,1].set_title('Prediction')
    ax[2,2].set_title('Mask')
    ax[0][0].axis("off")
    ax[1][0].axis("off")
    ax[2][0].axis("off")
    ax[0][1].axis("off")
    ax[1][1].axis("off")
    ax[2][1].axis("off")
    ax[0][2].axis("off")
    ax[1][2].axis("off")
    ax[2][2].axis("off")
    ax[0][0].imshow(img1)
    ax[0][1].imshow(preds1)
    ax[0][2].imshow(mask1)
    ax[1][0].imshow(img2)
    ax[1][1].imshow(preds2)
    ax[1][2].imshow(mask2)
    ax[2][0].imshow(img3)
    ax[2][1].imshow(preds3)
    ax[2][2].imshow(mask3)   
    break