In [1]:
# Import all the packages
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as T
import torch.optim as optim
from torch.utils.data import Dataset
from PIL import Image
import numpy as np
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [2]:
# Check the device we are using is GPU or CPU
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')
print(device)

cuda


In [3]:
# Build one of the main components - DoubleConv - for UNet
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.conv(x)

In [4]:
# Build UNet from scrach
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, features=[64, 128, 256, 512]):
        super().__init__()
        self.downs = nn.ModuleList()
        self.ups = nn.ModuleList()
        
        # Encoder (downsampling path)
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature
        
        # Bottleneck
        self.bottleneck = DoubleConv(features[-1], features[-1] * 2)
        
        # Decoder (upsampling path)
        for feature in reversed(features):
            self.ups.append(nn.ConvTranspose2d(feature * 2, feature, kernel_size=2, stride=2))
            self.ups.append(DoubleConv(feature * 2, feature))
        
        # Final convolution
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

    def forward(self, x):
        skip_connections = []

        # Encoder forward pass
        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = F.max_pool2d(x, kernel_size=2, stride=2)
        
        x = self.bottleneck(x)
        skip_connections.reverse()

        # Decoder forward pass
        for i in range(0, len(self.ups), 2):
            x = self.ups[i](x)  # Upsample
            skip_connection = skip_connections[i // 2]  # Get corresponding skip connection
            concat = torch.cat((skip_connection, x), dim=1)  # Concatenate along channel dimension
            x = self.ups[i + 1](concat)  # Apply double convolution

        return self.final_conv(x)


In [6]:
# Create an UNet model object
model = UNet()

toy_data = torch.ones((16, 3, 240, 160))
output = model(toy_data)
print(output.shape)
# Move the model to GPU
model = model.cuda()

torch.Size([16, 1, 240, 160])


In [7]:
# Build CustomDataset for loading data from Google Drive
class CustomDataset(Dataset):
    def __init__(self, image_dir, mask_dir, transform):
        super().__init__()
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.transform = transform
        self.images = os.listdir(image_dir)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        img_path = os.path.join(self.image_dir, self.images[index])
        mask_path = os.path.join(self.mask_dir, self.images[index].replace('.jpg', '_mask.gif'))

        image = np.array(Image.open(img_path))
        mask = np.array(Image.open(mask_path).convert('L'))

        return self.transform(image), self.transform(mask)


In [8]:
# Constants for UNet model training process
BATCH_SIZE = 16
NUM_EPOCHS = 3
IMG_WIDTH = 240
IMG_HEIGHT = 160

In [9]:
# Load data
all_data = CustomDataset(
    'small_train',
    'small_train_masks',
    T.Compose([
        T.ToTensor(),  
        T.Resize((IMG_HEIGHT, IMG_WIDTH))
    ])
)


In [10]:
# Split data into train and val
train_data, val_data = torch.utils.data.random_split(all_data, [0.7, 0.3])

In [11]:
# Create loader for mini-batch gradient descent
from torch.utils.data import DataLoader
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False)

In [12]:
# The loss function for bianry classification
loss_function = nn.BCEWithLogitsLoss()
# Choosing Adam as our optimizer
optimizer = optim.Adam(model.parameters())

In [13]:
def train(model, num_epochs, train_loader, optimizer, print_every=30):
    for epoch in range(num_epochs):
        for count, (x, y) in enumerate(train_loader):
            model.train()
            x = x.to(torch.device("cuda"))
            y = y.to(torch.device("cuda"))
            out = model(x)
            if count % print_every == 0:
                eval(model, val_loader, epoch)
            # out = torch.sigmoid(out)
            loss = loss_function(out, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

In [14]:
def eval(model, val_loader, epoch):
    model.eval()
    num_correct = 0
    num_pixels = 0
    with torch.no_grad():
        for x, y in val_loader:
            x = x.to(device)
            y = y.to(device)
            out_img = model(x)
            probability = torch.sigmoid(out_img)
            predictions = probability>0.5
            num_correct += (predictions==y).sum()
            num_pixels += BATCH_SIZE*IMG_WIDTH*IMG_HEIGHT
    print(f'Epoch[{epoch+1}] Acc: {num_correct/num_pixels}')

In [15]:
train(model, NUM_EPOCHS, train_loader, optimizer)

Epoch[1] Acc: 0.778840184211731
Epoch[1] Acc: 0.8511271476745605
Epoch[1] Acc: 0.8745274543762207
Epoch[2] Acc: 0.8657953143119812
Epoch[2] Acc: 0.8731623291969299
Epoch[2] Acc: 0.8778730034828186
Epoch[3] Acc: 0.8778916001319885
Epoch[3] Acc: 0.8741675615310669
Epoch[3] Acc: 0.8779928684234619
