In [None]:
!pip install torch torchvision
!pip install matplotlib
!pip install opencv-python
!pip install imutils
!pip install scikit-learn
!pip install tqdm
!pip install ternausnet
!pip install gdown

In [None]:
import gdown

url = 'https://drive.google.com/uc?id=12tPof_SUIDlxLP1N2GI0LiHoAHNlXJpt'

output = 'MIDV_2020_dataset.zip'
gdown.download(url, output)

In [None]:
!unzip MIDV_2020_dataset.zip >/dev/null
print('[INFO] The training dataset has been unzipped...')

In [None]:
!mkdir outputs

In [None]:
# Import necessary packages
from torch.utils.data import Dataset
from ternausnet.models import UNet11
from ternausnet.models import UNet16
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from torchvision import transforms
from imutils import paths
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
import time
import cv2
import os
import imutils

In [None]:
# Base path of the dataset
DATASET_PATH = os.path.join('MIDV_2020_dataset', 'train')

# Define the path to the images and masks dataset
IMAGE_DATASET_PATH = os.path.join(DATASET_PATH, 'images')
MASK_DATASET_PATH = os.path.join(DATASET_PATH, 'masks')

# Define the test split
TEST_SPLIT = 0.15

# Determine the device to be used for training and evaluation
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# Determine if we will be pinning memory during data loading
PIN_MEMORY = True if DEVICE == 'cuda' else False

# Define the number of channels in the input, number of classes,
# and number of levels in the U-Net model
NUM_CHANNELS = 1
NUM_CLASSES = 1
NUM_LEVELS = 3

# HYPERPARAMETERS
# Initialize learning rate, number of epochs, and batch size
INIT_LR = 0.001
NUM_EPOCHS = 15
BATCH_SIZE = 16

# Define the input image dimensions
INPUT_IMAGE_WIDTH = 256
INPUT_IMAGE_HEIGHT = 256

# Define threshold to filter weak predictions
THRESHOLD = 0.5

# Define the path to the base output directory
BASE_OUTPUT = 'outputs'

# Define the path to the testing image paths
TEST_PATHS = os.path.sep.join([BASE_OUTPUT, 'test_paths.txt'])

In [None]:
class SegmentationDataset(Dataset):
    def __init__(self, imagePaths, maskPaths, transforms):
        # Store the image and mask filepaths, and augmentation transforms
        self.imagePaths = imagePaths
        self.maskPaths = maskPaths
        self.transforms = transforms

    def __len__(self):
        # Return the number of total samples contained in the dataset
        return len(self.imagePaths)

    def __getitem__(self, idx):
        # Grab the image path from the current index
        imagePath = self.imagePaths[idx]

        # Load the image from disk, swap its channels from BGR to RGB,
        # and read the associated mask from disk in grayscale mode
        image = cv2.imread(imagePath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.maskPaths[idx], 0)

        # Check to see if we are applying any transformations
        if self.transforms is not None:
            # Apply the transformations to both image and its mask
            image = self.transforms(image)
            mask = self.transforms(mask)

        # Return the image and its mask
        return image, mask

In [None]:
def save_model(model, epoch, H):
    # Plot the training loss
    plt.style.use('ggplot')
    plt.figure()
    plt.plot(H['train_loss'], label='train_loss')
    plt.plot(H['test_loss'], label='test_loss')
    plt.title('Training Loss on Dataset')
    plt.xlabel('Epoch #')
    plt.ylabel('Loss')
    plt.legend(loc='lower left')
    plt.savefig(os.path.sep.join([BASE_OUTPUT, str(epoch) + '_plot.png']))

    # Serialize the model to disk
    torch.save(model, os.path.join(BASE_OUTPUT, str(epoch) + '_unet11_midv_2020.path'))

In [None]:
# Load the image and mask filepaths in a sorted manner
imagePaths = sorted(list(paths.list_images(IMAGE_DATASET_PATH)))
maskPaths = sorted(list(paths.list_images(MASK_DATASET_PATH)))

# Partition the data into training and testing splits using 85% of
# the data for training and the remaining 15% for testing
split = train_test_split(imagePaths, maskPaths, test_size=TEST_SPLIT, random_state=42)

# Unpack the data split
trainImages, testImages = split[:2]
trainMasks, testMasks = split[2:]

# Write the testing image paths to disk so that we can use them
# when evaluating/testing our model
print("[INFO] Saving testing image paths...")
f = open(TEST_PATHS, 'w')
f.write('\n'.join(testImages))
f.close()

# Define transformations
transforms = transforms.Compose([transforms.ToPILImage(),
                                 transforms.Resize((INPUT_IMAGE_HEIGHT,
                                                    INPUT_IMAGE_WIDTH)),
                                 transforms.ToTensor()])

# Create the train and test datasets
trainDS = SegmentationDataset(imagePaths=trainImages, maskPaths=trainMasks, transforms=transforms)
testDS = SegmentationDataset(imagePaths=testImages, maskPaths=testMasks, transforms=transforms)
print(f"[INFO] Found {len(trainDS)} examples in the training set...")
print(f"[INFO] Found {len(testDS)} examples in the test set...")

# Create the training and test data loaders
trainLoader = DataLoader(trainDS, shuffle=True,
                         batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                         num_workers=os.cpu_count())
testLoader = DataLoader(testDS, shuffle=False,
                        batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
                        num_workers=os.cpu_count())

# Initialize the U-Net11 model with pre-trained VGG11 encoders
unet = UNet11(pretrained=True).to(DEVICE)

# Initialize the U-Net16 model with pre-trained VGG16 encoders
# unet = UNet16(pretrained=True).to(DEVICE)

# Initialize the loss function and optimizer
lossFunc = BCEWithLogitsLoss()
opt = Adam(unet.parameters(), lr=INIT_LR)

# Calculate steps per epoch for training and test set
trainSteps = len(trainDS) // BATCH_SIZE
testSteps = len(testDS) // BATCH_SIZE

# Initialize a dictionary to store training history
H = {'train_loss': [], 'test_loss': []}

# Define the current best test loss
bestTestLoss = 0.015

# Loop over epochs
print("[INFO] Training the network...")
startTime = time.time()
for e in tqdm(range(NUM_EPOCHS)):
    # Set the model in training mode
    unet.train()
    
    # Initialize the total training and validation loss
    totalTrainLoss = 0
    totalTestLoss = 0

    # Loop over the training set
    for (i, (x, y)) in enumerate(trainLoader):
        print('Epoch ', e + 1, ': ', i + 1, '/', len(trainLoader), sep='')
        # Send the input to the device
        (x, y) = (x.to(DEVICE), y.to(DEVICE))

        # Perform a forward pass and calculate the training loss
        pred = unet(x)
        loss = lossFunc(pred, y)

        # First, zero out any previously accumulated gradients, then
        # perform backpropagation, and then update model parameters
        opt.zero_grad()
        loss.backward()
        opt.step()

        # Add the loss to the total training loss so far
        totalTrainLoss += loss

    # Switch off autograd
    with torch.no_grad():
        # Set the model in evaluation mode
        unet.eval()

        # Loop over the validation set
        for (x, y) in testLoader:
            # Send the input to the device
            (x, y) = (x.to(DEVICE), y.to(DEVICE))

            # Make the predictions and calculate the validation loss
            pred = unet(x)
            totalTestLoss += lossFunc(pred, y)

        # Calculate the average training and validation loss
        avgTrainLoss = totalTrainLoss / trainSteps
        avgTestLoss = totalTestLoss / testSteps

        # Update our training history
        H['train_loss'].append(avgTrainLoss.cpu().detach().numpy())
        H['test_loss'].append(avgTestLoss.cpu().detach().numpy())

        # Print the model training and validation information
        print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
        print("Train loss: {:.6f}, Test loss: {:.4f}".format(avgTrainLoss, avgTestLoss))
    
        # Save the current model if the test loss is better
        if avgTestLoss < bestTestLoss:
            print('[INFO] Test loss has improved, saving current model...')
            bestTestLoss = avgTestLoss
            save_model(unet, e + 1, H)

# Display the total time needed to perform the training
endTime = time.time()
print("[INFO] Total time taken to train the model: {:.2f}s".format(endTime - startTime))

# Save the final model
save_model(unet, -1, H)

In [None]:
# Clear output directory
outputPath = '/kaggle/working/outputs'
files = os.listdir(outputPath)

for f in files:
    os.remove(outputPath + '/' + f)