## Import Libraries and set paths

In [None]:
import os
import glob
import torch
import numpy as np
import cv2
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torch import nn
import torch.nn.functional as F
from PIL import Image
from tqdm import tqdm
import pandas as pd

# Set the path for the data
DATA_PATH = '/kaggle/input/vesuvius-challenge-ink-detection'
TRAIN_PATH = os.path.join(DATA_PATH, 'train')
TEST_PATH = os.path.join(DATA_PATH, 'test')

## Constants

In [None]:
PREFIX = '/kaggle/input/vesuvius-challenge-ink-detection/train/1/'
BUFFER = 30
Z_START = 27
Z_DIM = 10
TRAINING_STEPS = 30000
NUM_INK_PIXELS = 100
LEARNING_RATE = 0.001
BATCH_SIZE = 16
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Define SubvolumeDataset class

In [None]:
class SubvolumeDataset(Dataset):
    def __init__(self, data_paths, mask_paths, label_paths=None, device='cpu'):
        self.data_paths = data_paths
        self.mask_paths = mask_paths
        self.label_paths = label_paths
        self.device = device

    def __len__(self):
        return len(self.data_paths)

    def __getitem__(self, idx):
        data_path = self.data_paths[idx]
        mask_path = self.get_corresponding_mask_path(data_path)

        data = cv2.imread(data_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if self.label_paths:
            label_path = self.get_corresponding_label_path(data_path)
            label = cv2.imread(label_path, cv2.IMREAD_GRAYSCALE)
        else:
            label = None

        return (torch.from_numpy(data).to(self.device), 
                torch.from_numpy(mask).to(self.device), 
                torch.from_numpy(label).to(self.device) if label is not None else None)

    def get_corresponding_mask_path(self, data_path):
        # Generate the corresponding mask path
        mask_path = data_path.replace('surface_volume', 'mask')
        return mask_path

    def get_corresponding_label_path(self, data_path):
        # Generate the corresponding label path
        label_path = data_path.replace('surface_volume', 'inklabels')
        return label_path

## Initialize Parameters

In [None]:
# Number of input channels for the model
in_channels = 1  # Assuming grayscale images, adjust as per your data

# Number of training epochs
EPOCHS = 10  # Adjust as per your requirement

# Threshold for the RLE encoding
THRESHOLD = 0.5  # Adjust based on your model's output

# Define the paths to the training and test data

In [None]:
train_data_paths = sorted(glob.glob(os.path.join(TRAIN_PATH, '*/surface_volume/*.tif')))
train_label_paths = sorted(glob.glob(os.path.join(TRAIN_PATH, '*/label_volume/*.tif')))
train_ink_pixels_paths = sorted(glob.glob(os.path.join(TRAIN_PATH, '*/ink_pixels/*.csv')))

test_data_paths = sorted(glob.glob(os.path.join(TEST_PATH, '*/surface_volume/*.tif')))
test_label_paths = sorted(glob.glob(os.path.join(TEST_PATH, '*/label_volume/*.tif')))
test_ink_pixels_paths = sorted(glob.glob(os.path.join(TEST_PATH, '*/ink_pixels/*.csv')))

## Dataset Preparation

In [None]:
vol_dims = (512, 512, 512)  # Replace with the actual dimensions
train_dataset = SubvolumeDataset(train_data_paths, train_label_paths, train_ink_pixels_paths, vol_dims, DEVICE)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = SubvolumeDataset(test_data_paths, test_label_paths, test_ink_pixels_paths, vol_dims, DEVICE)

## Model Def

In [None]:
class SegmentationModel(nn.Module):
    def __init__(self, in_channels=1):
        super(SegmentationModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(512, 1024, kernel_size=3, padding=1)
        self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.upconv2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.upconv4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.final_conv = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        # Assuming x is of shape [batch_size, channels, height, width]
        x1 = F.relu(self.conv1(x))
        x2 = F.relu(self.conv2(x1))
        x3 = F.relu(self.conv3(x2))
        x4 = F.relu(self.conv4(x3))
        x5 = F.relu(self.conv5(x4))
        
        x6 = F.relu(self.upconv1(x5))
        x7 = F.relu(self.upconv2(x6))
        x8 = F.relu(self.upconv3(x7))
        x9 = F.relu(self.upconv4(x8))
        
        output = self.final_conv(x9)
        
        return output

## Evaluation Metrics

In [None]:
def dice_coefficient(y_true, y_pred):
    smooth = 1e-5
    y_true_f = y_true.view(-1)
    y_pred_f = y_pred.view(-1)
    intersection = (y_true_f * y_pred_f).sum()
    return (2. * intersection + smooth) / (y_true_f.sum() + y_pred_f.sum() + smooth)

## Saving the Model

## Loading the Model

In [None]:
# Set up the device for GPU usage
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model
model = SegmentationModel(in_channels).to(device)

# Define the loss function
criterion = nn.BCEWithLogitsLoss()

# Define the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Define the number of epochs
epochs = 10

# Start the training loop
for epoch in range(epochs):
    print(f'Starting epoch {epoch + 1}/{epochs}')
    print('-' * 10)

    train_loss = 0.0

    # Set the model to training mode
    model.train()

    # Iterate over the training data
    for images, masks in train_dataloader:
        # Move the images and masks to the GPU
        images = images.to(device)
        masks = masks.to(device)

        # Clear the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Calculate the loss
        loss = criterion(outputs, masks)

        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()

        # Update the training loss
        train_loss += loss.item() * images.size(0)

    # Print the loss for this epoch
    print(f'Loss: {train_loss / len(train_dataloader):.4f}')

print('Training complete.')

# Save the trained model
torch.save(model.state_dict(), 'segmentation_model.pt')

## Prediction

In [None]:
def predict_image(model, image):
    """Pass an image through the model and return the predicted segmentation."""
    with torch.no_grad():
        output = model(image)
        preds = torch.sigmoid(output) > THRESHOLD
    return preds

## Visualization

In [None]:
def visualize_prediction(image, prediction):
    """Visualize an image and its predicted segmentation."""
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.imshow(image.squeeze(), cmap='gray')
    plt.title('Original Image')
    
    plt.subplot(1, 2, 2)
    plt.imshow(prediction.squeeze(), cmap='gray')
    plt.title('Predicted Segmentation')
    
    plt.show()

## Test on a Single Image

In [None]:
# Choose a random image from the test dataset
image, label, _ = test_dataset[0]

# Make a prediction
prediction = predict_image(loaded_model, image.unsqueeze(0).to(DEVICE))

# Move the image and prediction to cpu for visualization
image = image.cpu()
prediction = prediction.cpu()

# Visualize the original image and the prediction
visualize_prediction(image, prediction)

## Evaluate on Test Set

In [None]:
def evaluate_model(model, dataloader):
    """Evaluate the model's performance on a dataloader."""
    model.eval()
    total = 0
    correct = 0
    with torch.no_grad():
        for images, labels, _ in dataloader:
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# Calculate accuracy on test set
test_accuracy = evaluate_model(loaded_model, test_dataloader)
print(f'Test accuracy: {test_accuracy}')

## Save Predictions for Further Analysis

In [None]:
def save_predictions(model, dataloader, output_dir):
    """Save the model's predictions on a dataloader to the specified directory."""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        
    model.eval()
    with torch.no_grad():
        for i, (images, _, paths) in enumerate(dataloader):
            images = images.to(DEVICE)
            outputs = model(images)
            preds = torch.sigmoid(outputs) > THRESHOLD
            for j in range(preds.shape[0]):
                output_path = os.path.join(output_dir, os.path.basename(paths[j]))
                torchvision.utils.save_image(preds[j], output_path)

# Save predictions on test set
save_predictions(loaded_model, test_dataloader, 'predictions/')

## Visualize Predictions

In [None]:
def visualize_predictions(dataset, model, num_samples=5):
    """Visualize model's predictions on a few samples from a dataset."""
    model.eval()
    samples = random.sample(list(range(len(dataset))), num_samples)
    with torch.no_grad():
        for i in samples:
            image, label, path = dataset[i]
            image = image.unsqueeze(0).to(DEVICE)
            output = model(image)
            pred = torch.sigmoid(output) > THRESHOLD
            fig, axs = plt.subplots(1, 3, figsize=(15, 5))
            axs[0].imshow(image[0].cpu().numpy().transpose(1, 2, 0))
            axs[0].set_title('Input Image')
            axs[1].imshow(label.cpu().numpy(), cmap='gray')
            axs[1].set_title('Ground Truth')
            axs[2].imshow(pred[0].cpu().numpy(), cmap='gray')
            axs[2].set_title('Predicted Segmentation')
            for ax in axs:
                ax.axis('off')
            plt.show()

# Visualize predictions on test set
visualize_predictions(test_dataset, loaded_model)

## Save Model

In [None]:
torch.save(loaded_model.state_dict(), 'segmentation_model_final.pt')

## Define Evaluation Metric

In [None]:
def fbeta_score(y_true, y_pred, beta=0.5):
    """Compute the Fbeta score, a weighted harmonic mean of precision and recall."""
    tp = (y_true * y_pred).sum().to(torch.float32)
    fp = ((1 - y_true) * y_pred).sum().to(torch.float32)
    fn = (y_true * (1 - y_pred)).sum().to(torch.float32)

    precision = tp / (tp + fp + 1e-12)
    recall = tp / (tp + fn + 1e-12)

    return (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall + 1e-12)

## Evaluate Model

In [None]:
def evaluate_model(model, dataset):
    """Evaluate model on a dataset using F0.5 score."""
    model.eval()
    total_fbeta = 0
    with torch.no_grad():
        for image, label, _ in tqdm(dataset):
            image = image.unsqueeze(0).to(DEVICE)
            label = label.unsqueeze(0).to(DEVICE)
            output = model(image)
            pred = torch.sigmoid(output) > THRESHOLD
            total_fbeta += fbeta_score(label, pred)
    return total_fbeta / len(dataset)

# Evaluate model on test set
test_fbeta = evaluate_model(loaded_model, test_dataset)

# Print test F0.5 score
print(f'Test F0.5 score: {test_fbeta:.4f}')

## Create Submission

In [None]:
def rle_encode(img):
    """Perform run-length encoding on binary image."""
    pixels = img.flatten()
    # We need to add two zeros at the beginning and end to detect runs correctly
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def create_submission(model, dataset, submission_path='submission.csv'):
    """Create submission file for the Kaggle competition."""
    model.eval()
    with open(submission_path, 'w') as file:
        writer = csv.writer(file)
        writer.writerow(['Id', 'Predicted'])
        with torch.no_grad():
            for image, _, id_ in tqdm(dataset):
                image = image.unsqueeze(0).to(DEVICE)
                output = model(image)
                pred = torch.sigmoid(output) > THRESHOLD
                pred_rle = rle_encode(pred.cpu().numpy())
                writer.writerow([id_, pred_rle])

# Create submission file
create_submission(loaded_model, test_dataset)