In [12]:
!pip3 install kornia

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [21]:
import sys
sys.path.append('/content/drive/MyDrive/FILM_DATABASE')
import paired_transforms as pt

In [17]:
import os
import torch
import numpy as np
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import kornia
import matplotlib.pyplot as plt
from torch import nn

In [None]:
folder_path = "/content/drive/MyDrive/FILM_DATABASE"

In [13]:
class AugmentedDataset(Dataset):
    def __init__(self, base_dataset, transform, augment_factor):
        self.base_dataset = base_dataset
        self.transform = transform
        self.augment_factor = augment_factor

    def __getitem__(self, index):
        base_index = index // self.augment_factor
        rh = pt.RandomHorizontalFlip()
        rv = pt.RandomVerticalFlip()
        rr = pt.RandomRotation(360)
        ca = pt.RandomAffine(45)
        cj = pt.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1)
        color_img, error_img = self.base_dataset[base_index]
        color_img, error_img = rh(color_img, error_img)
        color_img, error_img = rv(color_img, error_img)
        color_img, error_img = rr(color_img, error_img)
            #color_img, error_img = ra(color_img, error_img)
        color_img, error_img = cj(color_img, error_img)

        color_img = transforms.ToTensor()(color_img)
        error_img = transforms.ToTensor()(error_img)
        color_img = kornia.color.rgb_to_hsv(color_img)
        return color_img, error_img

    def __len__(self):
        return len(self.base_dataset) * self.augment_factor

In [14]:
class CustomDataset(Dataset):
    def __init__(self, color_dir, error_dir, window_size=256, blackness_threshold=10):
        self.window_size = window_size
        self.blackness_threshold = blackness_threshold
        self.min_augment = 5
        self.max_augment = 20


        # list of filenames
        filenames = os.listdir(color_dir)

        self.color_imgs = []
        self.error_imgs = []

        for filename in filenames:
            color_img = Image.open(os.path.join(color_dir, filename))
            error_img = Image.open(os.path.join(error_dir, filename))
            error_img = error_img.convert("L")

            step_size = self.window_size
            for i in range(0, 512, step_size):
                for j in range(0, 512, step_size):
                    sub_color_img = color_img.crop((i, j, i+step_size, j+step_size))
                    sub_error_img = error_img.crop((i, j, i+step_size, j+step_size))

                    #if np.mean(np.array(sub_color_img)) > self.blackness_threshold:
                    self.color_imgs.append(sub_color_img)
                    self.error_imgs.append(sub_error_img)


    def __getitem__(self, index):
        return self.color_imgs[index], self.error_imgs[index]

    def __len__(self):
        return len(self.color_imgs)



In [16]:
from torch.utils.data import random_split

# Constants
TRAIN_RATIO = 0.8
AUGMENT_FACTOR = 10

# Use the custom dataset
folder_path = "/content/drive/MyDrive/FILM_DATABASE"
color_dir = os.path.join(folder_path, 'color_mask')
error_dir = os.path.join(folder_path, 'error_mask')

# Create the full dataset
full_dataset = CustomDataset(color_dir, error_dir)

# Compute the lengths of train/test subsets
train_len = int(len(full_dataset) * TRAIN_RATIO)
test_len = len(full_dataset) - train_len

# Randomly split the dataset
train_dataset, test_dataset = random_split(full_dataset, [train_len, test_len])

# Define your transformations
transform = transforms.Compose([
    # Add your transformations here
    transforms.ToTensor(),
])

# Create the augmented dataset
augmented_train_dataset = AugmentedDataset(train_dataset, transform, augment_factor=AUGMENT_FACTOR)
augmented_test_dataset = AugmentedDataset(train_dataset, transform, augment_factor=1)
# Create DataLoaders
train_loader = DataLoader(augmented_train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(augmented_test_dataset, batch_size=8, shuffle=False)


In [19]:
print(len(augmented_train_dataset))
print(len(augmented_test_dataset))

2430
243


In [20]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
        else:
            self.up = nn.ConvTranspose2d(in_channels // 2, in_channels // 2, kernel_size=2, stride=2)

        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)

        diffY = torch.tensor([x2.size()[2] - x1.size()[2]])
        diffX = torch.tensor([x2.size()[3] - x1.size()[3]])

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 512)
        self.up1 = Up(1024, 256, bilinear)
        self.up2 = Up(512, 128, bilinear)
        self.up3 = Up(256, 64, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

# Create an instance of the model
model = UNet(n_channels=3, n_classes=1)

In [18]:
# Run this before retraining (clear Torch GPURAM cache)
with torch.no_grad():
    torch.cuda.empty_cache()

In [None]:
import torch.optim as optim
losses = []

# If a GPU is available, move the model to GPU
model = model.to("cuda")

# Initialize the optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.BCEWithLogitsLoss()

# Number of epochs
n_epochs = 7

# Training loop
for epoch in range(2, n_epochs):
    model.train()  # set the model to training mode
    if epoch == 5:
      new_lr = 0.0003
      for param_group in optimizer.param_groups:
        param_group['lr'] = new_lr

# Update learning rate for all parameter groups


    for i, (inputs, targets) in enumerate(train_loader):
        # If a GPU is available, move data and targets to GPU
        inputs = inputs.to("cuda")
        targets = targets.to("cuda")

        # Clear gradients
        optimizer.zero_grad()

        # Forward propagation
        targets = targets.squeeze(0)
        outputs = model(inputs)
        loss = criterion(outputs, targets.float())

        # Backward propagation
        loss.backward()

        # Update weights
        optimizer.step()
        losses.append(loss.item())
        # Print loss every 10 steps
        if i % 20 == 0:
          print(f"Epoch: {epoch}, Step: {i}, Loss: {loss.item()}")


In [None]:
with torch.no_grad():
    for i, (inputs, targets) in enumerate(test_loader):
        # If a GPU is available, move data and targets to GPU
        inputs = inputs.to("cuda")
        targets = targets.to("cuda")

        # Clear gradients
        optimizer.zero_grad()

        # Forward propagation
        targets = targets.squeeze(0)
        outputs = model(inputs)
        loss = criterion(outputs, targets.float())

        # Print loss every 10 steps
        print(loss.item())

In [None]:
torch.save(model.state_dict(), "/content/drive/MyDrive/tudel_unet.1")