In [None]:
import os
import shutil
import numpy as np
from tqdm import tqdm
import cv2
from sklearn.model_selection import train_test_split
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.functional as F
from torch.cuda.amp import GradScaler, autocast
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score, confusion_matrix, roc_auc_score

# Function to add Gaussian noise to an image
def add_gaussian_noise(image, mean=0, std=0.1):
    noise = torch.randn_like(image) * std + mean
    noisy_image = torch.clamp(image + noise, 0., 1.)  # Ensure values are in the range [0, 1]
    return noisy_image

# File paths
IMG_PATH = "/kaggle/input/brain-tumor-detection-mri/Brain_Tumor_Detection/"
WORKING_DIR = '/kaggle/working/'

# Image dimension and batch size
DIM = 62
batch_size = 32

# Directories for train, val, and test splits
TRAIN_DIR = os.path.join(WORKING_DIR, 'TRAIN')
VAL_DIR = os.path.join(WORKING_DIR, 'VAL')
TEST_DIR = os.path.join(WORKING_DIR, 'TEST')

# Reset directories
for directory in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
    if os.path.exists(directory):
        shutil.rmtree(directory)
    os.makedirs(directory)
    os.makedirs(os.path.join(directory, 'YES'))
    os.makedirs(os.path.join(directory, 'NO'))

# Gather all image file paths
all_images = []
for CLASS in ['yes', 'no']:
    if CLASS not in ["pred"]:  # Exclude the "pred" folder
        for img_file in os.listdir(os.path.join(IMG_PATH, CLASS)):
            all_images.append((os.path.join(IMG_PATH, CLASS, img_file), CLASS.upper()))

# Separate the images into YES and NO
yes_images = [img for img in all_images if img[1] == 'YES']
no_images = [img for img in all_images if img[1] == 'NO']

# Shuffle images to ensure randomness
np.random.seed(42)
np.random.shuffle(yes_images)
np.random.shuffle(no_images)

# Create imbalanced validation and test sets with 70% 'YES' and 30% 'NO'
val_test_yes = yes_images[:int(0.38 * len(yes_images))]
val_test_no = no_images[:int(0.38 * len(no_images))]

# Split the YES and NO data for validation and test with 70% YES and 30% NO
val_yes = val_test_yes[:int(0.5 * len(val_test_yes))]
val_no = val_test_no[:int(0.5 * len(val_test_no))]

test_yes = val_test_yes[int(0.50 * len(val_test_yes)):]
test_no = val_test_no[int(0.50 * len(val_test_no)):]

# Combine the validation and test sets
val_images = val_yes + val_no
test_images = test_yes + test_no

# Remaining images go to training set
train_images = yes_images[int(0.38 * len(yes_images)):] + no_images[int(0.38 * len(no_images)):]

# Shuffle the final sets to ensure randomness
np.random.shuffle(train_images)
np.random.shuffle(val_images)
np.random.shuffle(test_images)

# Count the number of images in each dataset
print(f"Number of training images: {len(train_images)}")
print(f"Number of validation images: {len(val_images)}")
print(f"Number of test images: {len(test_images)}")

# Copy images to respective directories
def copy_images(image_list, target_dir):
    for img_path, label in image_list:
        shutil.copy(img_path, os.path.join(target_dir, label, os.path.basename(img_path)))

copy_images(train_images, TRAIN_DIR)
copy_images(val_images, VAL_DIR)
copy_images(test_images, TEST_DIR)

# Custom Dataset class that loads images on-the-fly
class MyDataset(Dataset):
    def __init__(self, file_paths, transform=None, add_noise=False):
        self.file_paths = file_paths
        self.transform = transform
        self.add_noise = add_noise

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, index):
        img_path, label = self.file_paths[index]
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (DIM, DIM))
        label = 1 if label == 'YES' else 0
        if self.transform:
            image = self.transform(image)
        if self.add_noise:
            image = add_gaussian_noise(image)
        return image, label

# Data augmentation and normalization for training
trans_train = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Normalization for validation and test
trans_valid = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Datasets (Noise added for validation and test sets)
dataset_train = MyDataset(train_images, transform=trans_train, add_noise=True)
dataset_valid = MyDataset(val_images, transform=trans_valid, add_noise=True)
dataset_test = MyDataset(test_images, transform=trans_valid, add_noise=True)

# Dataloaders
loader_train = DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True, num_workers=2)
loader_valid = DataLoader(dataset=dataset_valid, batch_size=batch_size // 2, shuffle=False, num_workers=2)
loader_test = DataLoader(dataset=dataset_test, batch_size=batch_size // 2, shuffle=False, num_workers=2)

# Define the image dimension and hyperparameters
num_epochs = 300
batch_size = 52
accumulation_steps = 4
learning_rate = 0.0008

# Define the model and layers
class FuzzyAtrousConv(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, dilation=2, padding=1):
        super(FuzzyAtrousConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, dilation=dilation, padding=padding)
        self.bn = nn.BatchNorm2d(out_channels)

    def fuzzy_membership(self, x):
        high_membership = torch.sigmoid(x)
        low_membership = torch.sigmoid(-x)
        return high_membership, low_membership

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        high_membership, low_membership = self.fuzzy_membership(x)
        x = high_membership * x + low_membership * (1 - x)
        x = F.relu(x)
        return x

class TOFU(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TOFU, self).__init__()
        self.conv1 = FuzzyAtrousConv(in_channels, 16, kernel_size=3, dilation=2, padding=2)
        self.conv2 = FuzzyAtrousConv(16, 32, kernel_size=3, dilation=2, padding=2)
        self.conv3 = FuzzyAtrousConv(32 + 16, 48, kernel_size=3, dilation=2, padding=2)
        self.compress = nn.Conv2d(48 + 32 + 16, out_channels, kernel_size=1)
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, inputs):
        x1 = self.conv1(inputs)
        x2 = self.conv2(x1)
        x3 = self.conv3(torch.cat([x1, x2], dim=1))
        x4 = F.relu(self.compress(torch.cat([x1, x2, x3], dim=1)))
        x4 = self.bn(x4)
        return x4

class MOFU(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(MOFU, self).__init__()
        self.conv1 = FuzzyAtrousConv(in_channels, 16, kernel_size=3, dilation=2, padding=2)
        self.conv2 = FuzzyAtrousConv(16, 32, kernel_size=3, dilation=2, padding=2)
        self.conv3 = FuzzyAtrousConv(32, out_channels, kernel_size=3, dilation=2, padding=2)
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.bn(x)
        return x

class Net(nn.Module):
    def __init__(self, num_classes=1):
        super(Net, self).__init__()
        self.initial_conv = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.initial_bn = nn.BatchNorm2d(32)
        self.tofu1 = TOFU(in_channels=32, out_channels=64)
        self.tofu2 = TOFU(in_channels=64, out_channels=128)
        self.mofu = MOFU(128, 256)
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.initial_bn(self.initial_conv(x)))
        x = self.tofu1(x)
        x = self.tofu2(x)
        x = self.mofu(x)
        x = self.global_avg_pool(x).reshape(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.fc2(x)
        return x

# Initialize and set up PyTorch model, loss, optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net(num_classes=1).to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=20, verbose=True)
scaler = GradScaler()

# Training and evaluation functions with mixed precision and gradient accumulation
def train_one_epoch(epoch, model, criterion, optimizer, dataloader, scaler, accumulation_steps):
    model.train()
    running_loss = 0.0
    correct_preds = 0
    total_samples = 0
    
    optimizer.zero_grad()
    
    for i, (images, labels) in enumerate(dataloader):
        images, labels = images.to(device), labels.to(device).float().unsqueeze(1)
        
        with autocast():
            outputs = model(images)
            loss = criterion(outputs, labels) / accumulation_steps
        
        scaler.scale(loss).backward()
        
        if (i + 1) % accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
        
        running_loss += loss.item() * images.size(0) * accumulation_steps
        predicted = torch.round(torch.sigmoid(outputs))
        correct_preds += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_accuracy = correct_preds / total_samples
    
    print(f'Epoch [{epoch}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')
    return epoch_loss, epoch_accuracy

def evaluate(model, criterion, dataloader):
    model.eval()
    running_loss = 0.0
    correct_preds = 0
    total_samples = 0
    all_labels = []
    all_preds = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device).float().unsqueeze(1)
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * images.size(0)
            predicted = torch.round(torch.sigmoid(outputs))
            correct_preds += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
            all_outputs.extend(outputs.cpu().numpy())
    
    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_accuracy = correct_preds / total_samples
    epoch_auc = roc_auc_score(all_labels, all_outputs)
    
    return epoch_loss, epoch_accuracy, epoch_auc, all_labels, all_preds
os.makedirs('savedmodels', exist_ok=True)
# Main training loop
num_epochs = 300
patience = 20
best_val_loss = float('inf')
best_epoch = 0
train_losses, val_losses, train_accuracies, val_accuracies, val_aucs = [], [], [], [], []

for epoch in range(num_epochs):
    train_loss, train_accuracy = train_one_epoch(epoch + 1, model, criterion, optimizer, loader_train, scaler, accumulation_steps)
    val_loss, val_accuracy, val_auc, val_labels, val_preds = evaluate(model, criterion, loader_valid)
    
    print(f'Validation Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, AUC: {val_auc:.4f}')
    
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracies.append(train_accuracy)
    val_accuracies.append(val_accuracy)
    val_aucs.append(val_auc)
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_epoch = epoch + 1
        torch.save(model.state_dict(), f'savedmodels/best_model_epoch{best_epoch}.h5')
    
    if epoch - best_epoch >= patience:
        print("Early stopping triggered")
        break
    
    scheduler.step(val_loss)

# Final evaluation on test set
test_loss, test_accuracy, test_auc, test_labels, test_preds = evaluate(model, criterion, loader_test)
print(f'Test Loss: {test_loss:.4f}, Accuracy: {test_accuracy:.4f}, AUC: {test_auc:.4f}')

# Calculate metrics
precision = precision_score(test_labels, test_preds)
recall = recall_score(test_labels, test_preds)
f1 = f1_score(test_labels, test_preds)
kappa = cohen_kappa_score(test_labels, test_preds)

print(f'Test Precision: {precision:.4f}')
print(f'Test Recall: {recall:.4f}')
print(f'Test F1 Score: {f1:.4f}')
print(f'Test Cohen Kappa: {kappa:.4f}')

# Plot confusion matrix
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['NO', 'YES'], yticklabels=['NO', 'YES'])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Plot losses and accuracies
epochs_range = range(1, len(train_losses) + 1)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, train_losses, label='Train Loss')
plt.plot(epochs_range, val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss Over Epochs')
