In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
from PIL import Image
import numpy as np
from tqdm import tqdm

class BrainTumorDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.classes = ['glioma', 'meningioma', 'notumor', 'pituitary']
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        
        self.images = []
        self.labels = []
        
        # Load all image paths and labels
        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                self.images.append(os.path.join(class_dir, img_name))
                self.labels.append(self.class_to_idx[class_name])
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        img_path = self.images[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label


In [18]:

class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, dropout=0.1):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.ff = nn.Sequential(
            nn.Linear(embed_dim, embed_dim * 4),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(embed_dim * 4, embed_dim)
        )
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        attended, _ = self.attention(x, x, x)
        x = self.norm1(x + self.dropout(attended))
        x = self.norm2(x + self.dropout(self.ff(x)))
        return x

In [19]:
class BrainTumorClassifier(nn.Module):
    def __init__(self, num_classes=4):
        super().__init__()
        
        # CNN Feature Extraction
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.feature_size = 128 * (512 // 8) * (512 // 8)
        self.embed_dim = 256
        
        self.projection = nn.Linear(128, self.embed_dim)
        
        self.pos_embedding = nn.Parameter(torch.randn(1, (512 // 8) * (512 // 8), self.embed_dim))
        
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(self.embed_dim, num_heads=8)
            for _ in range(2)
        ])
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(self.embed_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        batch_size = x.size(0)
        x = self.cnn(x)
        x = x.view(batch_size, 128, -1).permute(0, 2, 1)
        x = self.projection(x)
        
        x = x + self.pos_embedding
        
        for block in self.transformer_blocks:
            x = block(x)
        x = x.mean(dim=1)
        x = self.classifier(x)
        return x

In [20]:
def train_model(model, train_loader, val_loader, device, num_epochs=50):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.1)
    
    best_val_loss = float('inf')
    best_model_state = None
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}'):
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
        
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        train_loss = train_loss / len(train_loader)
        train_acc = 100. * train_correct / train_total
        val_loss = val_loss / len(val_loader)
        val_acc = 100. * val_correct / val_total
        
        print(f'Epoch: {epoch+1}')
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%')
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')
        
        scheduler.step(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict().copy()
    
    model.load_state_dict(best_model_state)
    return model

In [None]:
# def main():
#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
#     transform = transforms.Compose([
#         transforms.Resize((512, 512)),
#         transforms.RandomHorizontalFlip(),
#         transforms.RandomRotation(10),
#         transforms.ToTensor(),
#         transforms.Normalize(mean=[0.485, 0.456, 0.406], 
#                            std=[0.229, 0.224, 0.225])
#     ])
    
#     train_dataset = BrainTumorDataset(
#         root_dir='/kaggle/input/brain-tumor-mri-dataset/Training',
#         transform=transform
#     )
    
#     test_dataset = BrainTumorDataset(
#         root_dir='/kaggle/input/brain-tumor-mri-dataset/Testing',
#         transform=transforms.Compose([
#             transforms.Resize((512, 512)),
#             transforms.ToTensor(),
#             transforms.Normalize(mean=[0.485, 0.456, 0.406], 
#                                std=[0.229, 0.224, 0.225])
#         ])
#     )
    
#     train_loader = DataLoader(
#         train_dataset, 
#         batch_size=32,
#         shuffle=True,
#         num_workers=4
#     )
    
#     test_loader = DataLoader(
#         test_dataset,
#         batch_size=32,
#         shuffle=False,
#         num_workers=4
#     )
    
#     model = BrainTumorClassifier().to(device)
    
#     model = train_model(model, train_loader, test_loader, device)
    
#     torch.save(model.state_dict(), 'brain_tumor_classifier.pth')

In [23]:
def main(N=10):  # N is the number of images per class
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    print(f"Loading {N} images per class")
    
    transform = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Create custom sampler class to limit images
    class LimitedDataset(Dataset):
        def __init__(self, root_dir, transform=None, n_per_class=None):
            self.root_dir = root_dir
            self.transform = transform
            self.classes = ['glioma', 'meningioma', 'notumor', 'pituitary']
            self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
            
            self.images = []
            self.labels = []
            
            # Load limited number of images per class
            for class_name in self.classes:
                class_dir = os.path.join(root_dir, class_name)
                files = sorted([f for f in os.listdir(class_dir) 
                              if f.endswith(('.jpg', '.jpeg', '.png'))])
                
                # Limit files if n_per_class is specified
                if n_per_class is not None:
                    files = files[:n_per_class]
                
                for img_name in files:
                    self.images.append(os.path.join(class_dir, img_name))
                    self.labels.append(self.class_to_idx[class_name])
                    
            print(f"Loaded total {len(self.images)} images from {root_dir}")
            
        def __len__(self):
            return len(self.images)
        
        def __getitem__(self, idx):
            img_path = self.images[idx]
            image = Image.open(img_path).convert('RGB')
            label = self.labels[idx]
            
            if self.transform:
                image = self.transform(image)
            
            return image, label
    
    # Create datasets with limited images
    train_dataset = LimitedDataset(
        root_dir='/kaggle/input/brain-tumor-mri-dataset/Training',
        transform=transform,
        n_per_class=N
    )
    
    test_dataset = LimitedDataset(
        root_dir='/kaggle/input/brain-tumor-mri-dataset/Testing',
        transform=transforms.Compose([
            transforms.Resize((512, 512)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ]),
        n_per_class=N//5  # Using 20% of N for testing
    )
    
    batch_size = min(32, N)  # Ensure batch size isn't larger than dataset
    
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size,
        shuffle=True,
        num_workers=4
    )
    
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4
    )
    
    model = BrainTumorClassifier().to(device)
    
    model = train_model(model, train_loader, test_loader, device)
    
    torch.save(model.state_dict(), f'brain_tumor_classifier_{N}samples.pth')

In [None]:
main()

Using device: cuda
Loading 10 images per class
Loaded total 40 images from /kaggle/input/brain-tumor-mri-dataset/Training
Loaded total 8 images from /kaggle/input/brain-tumor-mri-dataset/Testing


Epoch 1/50: 100%|██████████| 4/4 [00:00<00:00,  4.04it/s]


Epoch: 1
Train Loss: 1.5721, Train Acc: 32.50%
Val Loss: 1.4560, Val Acc: 25.00%


Epoch 2/50: 100%|██████████| 4/4 [00:00<00:00,  4.10it/s]


Epoch: 2
Train Loss: 1.5355, Train Acc: 30.00%
Val Loss: 1.3979, Val Acc: 25.00%


Epoch 3/50: 100%|██████████| 4/4 [00:00<00:00,  4.14it/s]


Epoch: 3
Train Loss: 1.5650, Train Acc: 20.00%
Val Loss: 1.4307, Val Acc: 25.00%


Epoch 4/50: 100%|██████████| 4/4 [00:00<00:00,  4.22it/s]


Epoch: 4
Train Loss: 1.4670, Train Acc: 22.50%
Val Loss: 1.4237, Val Acc: 25.00%


Epoch 5/50: 100%|██████████| 4/4 [00:00<00:00,  4.13it/s]


Epoch: 5
Train Loss: 1.5244, Train Acc: 20.00%
Val Loss: 1.4198, Val Acc: 25.00%


Epoch 6/50:   0%|          | 0/4 [00:00<?, ?it/s]