In [None]:
!pip install segmentation-models-pytorch opencv-python 

In [None]:
import timm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import jaccard_score, precision_score, recall_score, f1_score
from PIL import Image
from tqdm import tqdm
from torchinfo import summary
from torch.optim.lr_scheduler import ReduceLROnPlateau
import cv2
import segmentation_models_pytorch as smp

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
def dice_score(pred, target, epsilon=1e-6):
    pred = (pred > 0.5).float()
    intersection = (pred * target).sum()
    return (2. * intersection + epsilon) / (pred.sum() + target.sum() + epsilon)

def iou_score(pred, target, epsilon=1e-6):
    pred = (pred > 0.5).float()
    intersection = (pred * target).sum()
    union = pred.sum() + target.sum() - intersection
    return (intersection + epsilon) / (union + epsilon)

def precision_recall_f1(pred, target, epsilon=1e-6):
    pred = (pred > 0.5).float()
    tp = (pred * target).sum()
    fp = (pred * (1 - target)).sum()
    fn = ((1 - pred) * target).sum()
    
    precision = tp / (tp + fp + epsilon)
    recall = tp / (tp + fn + epsilon)
    f1 = 2 * (precision * recall) / (precision + recall + epsilon)
    
    return precision, recall, f1


In [None]:
def calculate_metrics(pred, target):
    dice = dice_score(pred, target)
    iou = iou_score(pred, target)
    precision, recall, f1 = precision_recall_f1(pred, target)
    return {
        'Dice Score': dice.item(),
        'IoU Score': iou.item(),
        'Precision': precision.item(),
        'Recall': recall.item(),
        'F1-Score': f1.item()
    }


In [None]:
def extract_metric_lists(metrics_list):
    dice_scores, iou_scores, precisions, recalls, f1_scores = [], [], [], [], []
    for epoch_metrics in metrics_list:
        dice_scores.append(epoch_metrics['Dice Score'])
        iou_scores.append(epoch_metrics['IoU Score'])
        precisions.append(epoch_metrics['Precision'])
        recalls.append(epoch_metrics['Recall'])
        f1_scores.append(epoch_metrics['F1-Score'])
    return dice_scores, iou_scores, precisions, recalls, f1_scores

In [None]:
def plotFunction(num_epochs, losses, dice_scores, iou_scores, precisions, recalls, f1_scores, graph_names):
    
    # Prepare epochs range
    epochs = range(1, num_epochs + 1)
    
    # Set figure size
    plt.figure(figsize=(16, 10))
    
    # Plot losses vs. epochs
    plt.subplot(3, 3, 1)
    plt.plot(epochs, losses, label='Loss', color="blue")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Loss vs. Epochs")
    plt.legend()

    # Plot accuracies vs. epochs
    plt.subplot(3, 3, 2)
    plt.plot(epochs, dice_scores, label='dice_scores', color="orange")
    plt.xlabel("Epochs")
    plt.ylabel("dice_score")
    plt.title("dice_scores vs. Epochs")
    plt.legend()

    # Plot recalls vs. epochs
    plt.subplot(3, 3, 3)
    plt.plot(epochs, iou_scores, label='iou_scores', color="purple")
    plt.xlabel("Epochs")
    plt.ylabel("iou_score")
    plt.title("iou_scores vs. Epochs")
    plt.legend()

    # Plot F1-scores vs. epochs
    plt.subplot(3, 3, 4)
    plt.plot(epochs, precisions, label='precisions', color="red")
    plt.xlabel("Epochs")
    plt.ylabel("precisions")
    plt.title("precisions vs. Epochs")
    plt.legend()

    # Plot precisions vs. epochs
    plt.subplot(3, 3, 5)
    plt.plot(epochs, recalls, label='recalls', color="blue")
    plt.xlabel("Epochs")
    plt.ylabel("recalls")
    plt.title("recalls vs. Epochs")
    plt.legend()

    # Plot ROC-AUC vs. epochs
    plt.subplot(3, 3, 6)
    plt.plot(epochs, f1_scores, label='f1_scores', color="green")
    plt.xlabel("Epochs")
    plt.ylabel("f1_scores")
    plt.title("f1_scores vs. Epochs")
    plt.legend()

    # Adjust layout
    plt.tight_layout()

    # Save the entire figure (all subplots) as an image
    plt.savefig('/kaggle/working/' + graph_names + '.png')  # Save as PNG file
    plt.show()


In [None]:
import json

def save_checkpoint(
    epoch,
    model,
    optimizer,
    train_metrics,
    val_metrics,
    save_dir,
    save_filename,
    is_best
):
    """
    Save the model checkpoint with metrics for all epochs and model weights for the last epoch.

    Args:
        epoch (int): Current epoch number.
        model (torch.nn.Module): The PyTorch model.
        optimizer (torch.optim.Optimizer): The optimizer used for training.
        train_metrics_all_epochs (list): List of training metrics for all epochs.
        val_metrics_all_epochs (list): List of validation metrics for all epochs.
        save_dir (str): Directory to save the checkpoint.
        save_filename (str): Base filename for the checkpoint.

    Returns:
        None
    """
    os.makedirs(save_dir, exist_ok=True)

    # Save metrics for all epochs in a JSON file (append mode)
    metrics = {
        'epoch': epoch,
        'train_metrics': train_metrics,
        'val_metrics': val_metrics,
    }
    
    metrics_file_path = os.path.join(save_dir, f"{save_filename}_metrics.json")
    # Check if the file exists and has content
    if os.path.exists(metrics_file_path):
        with open(metrics_file_path, 'r') as f:
            try:
                all_metrics = json.load(f)  # Attempt to load existing metrics
            except json.JSONDecodeError:
                all_metrics = []  # If the file is empty or corrupted, initialize as empty list
    else:
        all_metrics = []  # If the file doesn't exist, initialize as empty list
    
    all_metrics.append(metrics)
    
    with open(metrics_file_path, 'w') as f:
            json.dump(all_metrics, f, indent=4)
    print(f"Metrics for epoch {epoch} saved at: {metrics_file_path}")

    
    # Save model weights and optimizer state for the last epoch only
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }
    checkpoint_file_path_last = os.path.join(save_dir, "last_.pt")
    torch.save(checkpoint, checkpoint_file_path_last)
    # if xm.is_master_ordinal():
        # torch.save(checkpoint, checkpoint_file_path)

    print(f"Model and optimizer state for the last epoch saved at: {checkpoint_file_path_last}")

    if(is_best == True):
        checkpoint_file_path_best = os.path.join(save_dir, "best_.pt")
        torch.save(checkpoint, checkpoint_file_path_best)
        # if xm.is_master_ordinal():
            # torch.save(checkpoint, checkpoint_file_path)

        print(f"Model and optimizer state for the best epoch saved at: {checkpoint_file_path_best}")


In [None]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    train_metrics = []
    total_loss = 0.0
    
    for images, masks in tqdm(dataloader, desc="Training", leave=True):
        images, masks = images.to(device), masks.to(device)
        optimizer.zero_grad()
        
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        
        metrics = calculate_metrics(outputs, masks)
        train_metrics.append(metrics)
    
    avg_loss = total_loss / len(dataloader)
    avg_metrics = {key: sum(d[key] for d in train_metrics) / len(train_metrics) for key in train_metrics[0]}
    return avg_loss, avg_metrics

In [None]:
def validate_one_epoch(model, dataloader, criterion, device):
    model.eval()
    val_metrics = []
    total_loss = 0.0
    
    with torch.no_grad():
        for images, masks in tqdm(dataloader, desc="Validation", leave=True):
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
            loss = criterion(outputs, masks)
            
            total_loss += loss.item()
            metrics = calculate_metrics(outputs, masks)
            val_metrics.append(metrics)
    
    avg_loss = total_loss / len(dataloader)
    avg_metrics = {key: sum(d[key] for d in val_metrics) / len(val_metrics) for key in val_metrics[0]}
    return avg_loss, avg_metrics


In [None]:
def train_and_validate(model, train_loader, val_loader, optimizer, criterion, epochs, save_dir, save_filename):
    train_losses, val_losses = [], []
    train_metrics_list, val_metrics_list = [], []
    global global_val_dice
    global_val_dice = 0.0
    
    os.makedirs(save_dir, exist_ok=True)

    
    for epoch in range(epochs):
        print("Active and training is going on...")
        print(f"Epoch {epoch+1}/{epochs}")
        
        train_loss, train_metrics = train_one_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_metrics = validate_one_epoch(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_metrics_list.append(train_metrics)
        val_metrics_list.append(val_metrics)
        
        print(f"Epoch {epoch+1}/{epochs}")
        print(f"Train Loss: {train_loss:.4f}, Metrics: {train_metrics}")
        print(f"Val Loss: {val_loss:.4f}, Metrics: {val_metrics}\n")

        

        if(val_metrics["Dice Score"] >= global_val_dice):
            save_checkpoint(epoch, model, optimizer, train_metrics, val_metrics, save_dir, save_filename, True)
            global_val_dice = val_metrics["Dice Score"]
        else:
            save_checkpoint(epoch, model, optimizer, train_metrics, val_metrics, save_dir, save_filename, False)


    
    train_dice, train_iou, train_precision, train_recall, train_f1 = extract_metric_lists(train_metrics_list)
    val_dice, val_iou, val_precision, val_recall, val_f1 = extract_metric_lists(val_metrics_list)

    plotFunction(epochs, train_losses, train_dice, train_iou, train_precision, train_recall, train_f1, "Training graphs")
    plotFunction(epochs, val_losses, val_dice, val_iou, val_precision, val_recall, val_f1, "validation graphs")
    
    return 

In [None]:
# Custom Dataset (assuming your dataset returns images and multilabel targets)
class CustomDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None, mask_transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform
        self.mask_transform = mask_transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])
        
        if image.mode == 'L':
            image = image.convert('RGB')
        if self.transform:
            image = self.transform(image)

         # Ensure mask is numeric
        mask = Image.open(self.mask_paths[idx])
        # if isinstance(mask, str):  # Convert if it's a string
        #     mask = np.array(Image.open(mask))  # Load mask as NumPy array
        
        if mask_transform:
            mask = self.mask_transform(mask)
    
        # masks = torch.tensor(self.masks[idx], dtype=torch.float32)
        return image, mask

In [None]:
transform_train = transforms.Compose([
    transforms.Resize((640, 640)),  # Resizing to 640x640
    transforms.RandomHorizontalFlip(p=0.5),  # Random horizontal flip with 50% probability
    transforms.ToTensor(),  # Convert image to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])

transform_val = transforms.Compose([
    transforms.Resize((640, 640)),  # Resizing to 640x640
    transforms.ToTensor(),  # Convert image to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])

mask_transform = transforms.Compose([
            transforms.Resize((640, 640)),  # Resize mask to match model output
            transforms.ToTensor()          # Convert to tensor (scales to [0,1])
])


In [None]:
import os
path = '/kaggle/input/figsharedataset/images/'
Images = os.listdir('/kaggle/input/figsharedataset/images/')
Masks = os.listdir('/kaggle/input/figsharedataset/masks/')
len(Images)
len(Masks)

count = 0
# gray_scale_images = [image = Image.open(image) for image in Images if image.mode != 'RGB']
for image in Images:
    image = Image.open(os.path.join(path, image))
    if image.mode != 'RGB':
        count+=1

print(f"total number of images in Images dataset = {len(Images)}")
print(f"total number of gray scale images in Images dataset = {count}")

In [None]:
image = Image.open(os.path.join(path, Images[0]))
print(f"mode of image is = {image.mode}")

In [None]:
Images = os.listdir('/kaggle/input/figsharedataset/images/')
Masks = os.listdir('/kaggle/input/figsharedataset/masks/')

base_image_path = '/kaggle/input/figsharedataset/images'
base_masks_path = '/kaggle/input/figsharedataset/masks'

image_paths = [os.path.join(base_image_path, image_name) for image_name in Images]
mask_paths = [os.path.join(base_masks_path, mask_name) for mask_name in Masks]

In [None]:
from PIL import Image
# sample = Image.open(image_paths[0])
sample = cv2.imread(image_paths[0])
print(sample.shape)
# print(sample.shape)

In [None]:
from sklearn.model_selection import train_test_split

images_train_paths, images_test_paths, masks_train_paths, masks_test_paths = train_test_split(image_paths, mask_paths, 
                                   random_state=104,  
                                   test_size=0.20,  
                                   shuffle=True) 

images_train_paths, images_val_paths, masks_train_paths, masks_val_paths = train_test_split(images_train_paths, masks_train_paths, 
                                   random_state=104,  
                                   test_size=0.10,  
                                   shuffle=True) 

In [None]:

# Load pretrained U-Net model
model = smp.Unet(
    encoder_name="resnet34",  # Backbone model
    encoder_weights="imagenet",  # Use pretrained weights
    in_channels=3,  # 3-channel RGB input
    classes=1 # Binary segmentation
    # activation=None,  # Use sigmoid during inference
)


# Initialize and move the model to the correct device
model = nn.DataParallel(model)
model.to(device)

# model.load_state_dict(checkpoint['model_state_dict'])
# Summarize the model
summary(model, input_size=(1, 3, 640, 640), col_names=["input_size", "output_size", "num_params"])

In [None]:
# Define the loss and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Define scheduler - this will decrease the learning rate by 0.5 after 3 successive epochs with no improvement in validation loss
# scheduler = ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)


In [None]:
save_dir = '/kaggle/working/segmentation'
if not os.path.exists(save_dir):
    os.makedirs(save_dir)

save_filename = "checkpoints"

In [None]:
# Load your dataset
train_dataset = CustomDataset(images_train_paths, masks_train_paths, transform=transform_train, mask_transform=mask_transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)

val_dataset = CustomDataset(images_val_paths, masks_val_paths, transform=transform_val, mask_transform=mask_transform)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:
epochs = 50
train_and_validate(model, train_loader, val_loader, optimizer, criterion, epochs, save_dir=save_dir, save_filename=save_filename)

# Testing

In [None]:
import random 

def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    test_metrics = []
    total_loss = 0.0
    sample_images, sample_masks, sample_preds = [], [], []
    
    with torch.no_grad():
        for images, masks in tqdm(dataloader, desc="Testing", leave=True):
            images, masks = images.to(device), masks.to(device)
            outputs = model(images)
           
            metrics = calculate_metrics(outputs, masks)
            test_metrics.append(metrics)

            # Select 5 random images for visualization
            if len(sample_images) < 5:
                idx = random.randint(0, images.shape[0] - 1)  # Choose a random sample from the batch
                sample_images.append(images[idx].cpu())
                sample_masks.append(masks[idx].cpu())
                sample_preds.append(outputs[idx].sigmoid().cpu())  # Convert logits to probabilities

    # avg_loss = total_loss / len(dataloader)
    avg_metrics = {key: sum(d[key] for d in test_metrics) / len(test_metrics) for key in test_metrics[0]}
    return avg_metrics, sample_images, sample_masks, sample_preds #, avg_loss
                      

In [None]:
# Load your dataset
test_dataset = CustomDataset(images_test_paths, masks_test_paths, transform=transform_val, mask_transform=mask_transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:
test_metrics_list = []
test_metrics, sample_images, sample_masks, sample_preds  = evaluate_model(model, test_loader, criterion, device)

test_metrics_list.append(test_metrics)                      
test_dice, test_iou, test_precision, test_recall, test_f1 = extract_metric_lists(test_metrics_list)    

In [None]:
print(f"Test Metrics :- ")
print(f"Dice score = {test_dice[0]*100 : .4f}")
print(f"IOU score = {test_iou[0]*100 : .4f}")
print(f"precision = {test_precision[0]*100 : .4f}")
print(f"recall = {test_recall[0]*100 : .4f}")
print(f"f1-score = {test_f1[0]*100 : .4f}")

In [None]:
import random

def visualize_predictions(sample_images, sample_masks, sample_preds, segmentation_images, num_images=5):
    num_samples = min(len(sample_images), num_images)
    indices = random.sample(range(len(sample_images)), num_samples)
    
    plt.figure(figsize=(12, 4 * num_samples))
    
    for i, idx in enumerate(indices):
        plt.subplot(num_samples, 3, 3*i + 1)
        plt.imshow(sample_images[idx].permute(1, 2, 0).numpy())
        plt.title("Original Image")
        plt.axis("off")
        
        plt.subplot(num_samples, 3, 3*i + 2)
        plt.imshow(sample_masks[idx].numpy().squeeze(), cmap="gray")
        plt.title("Ground Truth Mask")
        plt.axis("off")
        
        plt.subplot(num_samples, 3, 3*i + 3)
        predicted_mask = (sample_preds[idx] > 0.5).numpy().squeeze()  # Thresholding
        plt.imshow(predicted_mask, cmap="gray")
        plt.title("Predicted Mask")
        plt.axis("off")

    plt.tight_layout()
   
    plt.savefig('/kaggle/working/' + segmentation_images + '.png')  # Save as PNG file
    plt.show()


In [None]:
# Visualize some predictions
num_images = 10
image_filename = "segmentation_images"
visualize_predictions(sample_images, sample_masks, sample_preds, image_filename, num_images)

# saving model

In [None]:
# Remove the destination directory if it exists (force deletion)
!rm -rf /kaggle/working/brainTumourSegmentation_model

# Create the destination directory
!mkdir /kaggle/working/brainTumourSegmentation_model

# Move the folder to the new directory
!mv /kaggle/working/segmentation /kaggle/working/brainTumourSegmentation_model


In [None]:
!mkdir -p ~/.kaggle
!cp /kaggle/input/kaggle-json/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
import json

# Fetch the Kaggle username dynamically (if configured) or replace with your username
with open(os.path.expanduser("~/.kaggle/kaggle.json"), "r") as f:
    kaggle_config = json.load(f)

kaggle_username = kaggle_config["username"]

# Ensure the dataset ID is slug-friendly
dataset_title = "brainTumourSegmentation_model"
dataset_slug = dataset_title.replace("_", "-").lower()  # Ensure underscores are replaced
dataset_id = f"{kaggle_username}/{dataset_slug}"

In [None]:
import json

with open(os.path.expanduser("~/.kaggle/kaggle.json"), "r") as f:
    kaggle_config = json.load(f)

kaggle_username = kaggle_config["username"]

dataset_metadata = {
    "title": dataset_title,
    "id": dataset_id,  # Use the cleaned slug here
    "licenses": [{"name": "CC0-1.0"}]
}

In [None]:
# Save metadata file
with open('/kaggle/working/brainTumourSegmentation_model/dataset-metadata.json', 'w') as f:
    json.dump(dataset_metadata, f)
    

In [None]:
!kaggle datasets create -p /kaggle/working/brainTumourSegmentation_model --dir-mode tar

# saving graphs

In [None]:
import os
import shutil
import json

# Create target directory for images if it doesn't exist
target_dir = "/kaggle/working/brainTumourSegmentation_graphs"
os.makedirs(target_dir, exist_ok=True)

# Source directory where images are currently located
source_dir = "/kaggle/working/"

# Move all PNG files from source to target directory
for file_name in os.listdir(source_dir):
    if file_name.endswith("graphs.png"):
        shutil.move(os.path.join(source_dir, file_name), os.path.join(target_dir, file_name))

print(f"All PNG files moved to {target_dir}")


In [None]:

# Fetch the Kaggle username and API key from environment variables (set earlier)
kaggle_username = kaggle_config["username"]
kaggle_key = kaggle_config["key"]

# Ensure that username and key are fetched
if not kaggle_username or not kaggle_key:
    raise ValueError("KAGGLE_USERNAME and KAGGLE_KEY must be set in environment variables.")

# Ensure the dataset ID is slug-friendly
dataset_title = "brainTumourSegmentation_graphs"
dataset_slug = dataset_title.replace("_", "-").lower()  # Replace underscores with hyphens
dataset_id = f"{kaggle_username}/{dataset_slug}"

# Create the metadata
dataset_metadata = {
    "title": dataset_title,
    "id": dataset_id,
    "licenses": [{"name": "CC0-1.0"}]  # Specify the license for the dataset
}

# Print the metadata to verify
print(json.dumps(dataset_metadata, indent=4))

# Ensure the directory exists before saving the metadata file
os.makedirs("/kaggle/working/brainTumourSegmentation_graphs", exist_ok=True)

# Save the metadata file
metadata_path = '/kaggle/working/brainTumourSegmentation_graphs/dataset-metadata.json'
with open(metadata_path, 'w') as f:
    json.dump(dataset_metadata, f)

print(f"Metadata saved to {metadata_path}")


In [None]:
!kaggle datasets create -p /kaggle/working/brainTumourSegmentation_graphs

# saving segmentation example images

In [None]:
import os
import shutil
import json

# Create target directory for images if it doesn't exist
target_dir = "/kaggle/working/brainTumourSegmentation_images"
os.makedirs(target_dir, exist_ok=True)

# Source directory where images are currently located
source_dir = "/kaggle/working/"

# Move all PNG files from source to target directory
for file_name in os.listdir(source_dir):
    if file_name.endswith(".png"):
        shutil.move(os.path.join(source_dir, file_name), os.path.join(target_dir, file_name))

print(f"All PNG files moved to {target_dir}")


In [None]:

# Fetch the Kaggle username and API key from environment variables (set earlier)
kaggle_username = kaggle_config["username"]
kaggle_key = kaggle_config["key"]

# Ensure that username and key are fetched
if not kaggle_username or not kaggle_key:
    raise ValueError("KAGGLE_USERNAME and KAGGLE_KEY must be set in environment variables.")

# Ensure the dataset ID is slug-friendly
dataset_title = "brainTumourSegmentation_images"
dataset_slug = dataset_title.replace("_", "-").lower()  # Replace underscores with hyphens
dataset_id = f"{kaggle_username}/{dataset_slug}"

# Create the metadata
dataset_metadata = {
    "title": dataset_title,
    "id": dataset_id,
    "licenses": [{"name": "CC0-1.0"}]  # Specify the license for the dataset
}

# Print the metadata to verify
print(json.dumps(dataset_metadata, indent=4))

# Ensure the directory exists before saving the metadata file
os.makedirs("/kaggle/working/brainTumourSegmentation_images", exist_ok=True)

# Save the metadata file
metadata_path = '/kaggle/working/brainTumourSegmentation_images/dataset-metadata.json'
with open(metadata_path, 'w') as f:
    json.dump(dataset_metadata, f)

print(f"Metadata saved to {metadata_path}")


In [None]:
!kaggle datasets create -p /kaggle/working/brainTumourSegmentation_images