In [1]:
import torch
torch.cuda.empty_cache()

In [2]:
import os
import shutil
import pandas as pd
from sklearn.model_selection import GroupShuffleSplit

# Configuration
SOURCE_ROOT = r"C:\NPersonal\Projects\SDP\Prediction Stuff\Dataset\MDVR Spectrogram Dataset"
DEST_ROOT = r"C:\NPersonal\Projects\SDP\Prediction Stuff\Dataset\MDVR"
SPLIT_RATIO = (0.7, 0.15, 0.15)  # Train, Val, Test
SEED = 42

def extract_patient_number(folder_name):
    """Extract base patient number from complex folder names"""
    # Example: "S_ID00_hc_0_0_0" -> "ID00"
    # Example: "R_PD_123_1_2" -> "PD_123"
    parts = folder_name.split('_')
    if parts[0] in ['S', 'R']:
        base_parts = parts[1:-3]  # Remove S/R prefix and numeric suffixes
    else:
        base_parts = parts[:-3]  # For non-S/R prefixed patients
    return '_'.join(base_parts)

def process_dataset():
    # Collect all patients with their base numbers
    patients = []
    
    for class_name in ['HC', 'PD']:
        class_path = os.path.join(SOURCE_ROOT, class_name)
        if not os.path.exists(class_path):
            continue
            
        for folder in os.listdir(class_path):
            folder_path = os.path.join(class_path, folder)
            if os.path.isdir(folder_path):
                base_number = extract_patient_number(folder)
                patients.append({
                    'original_path': folder_path,
                    'class': class_name,
                    'base_number': base_number,
                    'full_name': folder
                })

    # Create DataFrame and group by base number
    df = pd.DataFrame(patients)
    grouped = df.groupby(['base_number', 'class'])

    # Prepare data for splitting
    unique_patients = []
    for (base_num, cls), group in grouped:
        unique_patients.append({
            'base_number': base_num,
            'class': cls,
            'paths': group['original_path'].tolist(),
            'count': len(group)
        })

    # Convert to DataFrame for splitting
    split_df = pd.DataFrame(unique_patients)

    # Split patients into train/val/test
    gss = GroupShuffleSplit(n_splits=1, test_size=SPLIT_RATIO[1]+SPLIT_RATIO[2], random_state=SEED)
    train_idx, temp_idx = next(gss.split(split_df, groups=split_df['base_number']))

    gss_val_test = GroupShuffleSplit(n_splits=1, test_size=SPLIT_RATIO[2]/(SPLIT_RATIO[1]+SPLIT_RATIO[2]), random_state=SEED)
    val_idx, test_idx = next(gss_val_test.split(split_df.iloc[temp_idx], groups=split_df.iloc[temp_idx]['base_number']))

    # Create splits
    splits = {
        'train': split_df.iloc[train_idx],
        'val': split_df.iloc[temp_idx].iloc[val_idx],
        'test': split_df.iloc[temp_idx].iloc[test_idx]
    }

    # Copy files while preserving S/R versions
    for split_name, split_data in splits.items():
        print(f"Processing {split_name} set...")
        for _, patient in split_data.iterrows():
            for version_path in patient['paths']:
                # Create destination path
                dest_folder = os.path.join(
                    DEST_ROOT,
                    split_name,
                    patient['class'],
                    os.path.basename(version_path)  # Keep original folder name
                )
                
                os.makedirs(dest_folder, exist_ok=True)
                
                # Copy all PNG files
                for file in os.listdir(version_path):
                    if file.lower().endswith('.png'):
                        src = os.path.join(version_path, file)
                        dst = os.path.join(dest_folder, file)
                        if not os.path.exists(dst):
                            shutil.copy2(src, dst)

    # Verification
    print("\n📊 Final Verification:")
    for split in ['train', 'val', 'test']:
        split_path = os.path.join(DEST_ROOT, split)
        total = 0
        for root, dirs, files in os.walk(split_path):
            total += len(files)
        print(f"{split.upper()} - {total} images")
        print(f"Sample patient folders: {os.listdir(split_path)[:2]}")

if __name__ == "__main__":
    process_dataset()
    print("\n✅ Dataset organized with strict patient-level splits!")

KeyError: 'base_number'

In [None]:
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# Set seeds for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

# Custom dataset class for MDVR dataset
class MDVRDataset(Dataset):
    def __init__(self, root_dir, split='train', transform=None):
        """
        Args:
            root_dir (string): Directory with all the data.
            split (string): 'train', 'test', or 'val'
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.split = split
        self.transform = transform
        self.classes = ['hc', 'pd']  # healthy control, Parkinson's disease
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        
        self.samples = []
        self._load_data()
        
    def _load_data(self):
        split_dir = os.path.join(self.root_dir, self.split)
        for class_name in self.classes:
            class_idx = self.class_to_idx[class_name]
            class_dir = os.path.join(split_dir, class_name)
            
            # Iterate through patient folders
            for patient_folder in os.listdir(class_dir):
                patient_path = os.path.join(class_dir, patient_folder)
                if os.path.isdir(patient_path):
                    # Iterate through image files in patient folder
                    for img_name in os.listdir(patient_path):
                        if img_name.endswith(('.png', '.jpg', '.jpeg')):
                            img_path = os.path.join(patient_path, img_name)
                            self.samples.append((img_path, class_idx, patient_folder))
        
        print(f"Loaded {len(self.samples)} samples for {self.split} split")
        
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, class_idx, patient_id = self.samples[idx]
        
        # Load image
        # image = Image.open(img_path).convert('RGB')
        image = Image.open(img_path).convert('L')  # Convert to grayscale

        
        # Apply transforms if any
        if self.transform:
            image = self.transform(image)
        
        return image, class_idx, patient_id


# Augmentation techniques based on the paper

# 1. Frequency Masking
class FrequencyMasking:
    def __init__(self, max_width=30, num_masks=1):
        self.max_width = max_width
        self.num_masks = num_masks
        
    def __call__(self, img):
        img_tensor = transforms.ToTensor()(img)
        _, h, w = img_tensor.shape
        
        for _ in range(self.num_masks):
            f = np.random.randint(0, self.max_width)
            f0 = np.random.randint(0, h - f)
            
            # Apply frequency mask
            img_tensor[:, f0:f0+f, :] = 0
        
        return transforms.ToPILImage()(img_tensor)

# 2. Mixup
class Mixup:
    def __init__(self, alpha=0.2):
        self.alpha = alpha
        
    def __call__(self, batch_x, batch_y):
        """Apply mixup to a batch of images and labels"""
        lam = np.random.beta(self.alpha, self.alpha)
        batch_size = batch_x.size(0)
        index = torch.randperm(batch_size).to(batch_x.device)
        
        mixed_x = lam * batch_x + (1 - lam) * batch_x[index]
        y_a, y_b = batch_y, batch_y[index]
        
        return mixed_x, y_a, y_b, lam

# 3. Mixed Frequency Masking
class MixedFrequencyMasking:
    def __init__(self, max_width=30, num_masks=1):
        self.max_width = max_width
        self.num_masks = num_masks
    
    def __call__(self, batch_x, batch_y):
        """Apply mixed frequency masking to a batch of images and labels"""
        batch_size = batch_x.size(0)
        device = batch_x.device
        _, h, w = batch_x[0].shape
        
        # Create copies for mixing
        mixed_x = batch_x.clone()
        
        # For storing mixing ratios
        mixing_ratios = torch.ones(batch_size, device=device)
        
        # For each image in the batch
        for i in range(batch_size):
            # Select another random image to mix with
            j = (i + torch.randint(1, batch_size, (1,)).item()) % batch_size
            
            # Apply frequency replacements
            total_freq_replaced = 0
            
            for _ in range(self.num_masks):
                # Random frequency mask width
                f = torch.randint(1, self.max_width + 1, (1,)).item()
                # Random starting frequency
                f0 = torch.randint(0, h - f, (1,)).item()
                
                # Replace frequency band in image i with the same band from image j
                mixed_x[i, :, f0:f0+f, :] = batch_x[j, :, f0:f0+f, :]
                
                # Accumulate total frequency replaced
                total_freq_replaced += f
            
            # Calculate mixing ratio based on proportion of frequency replaced
            mixing_ratio = total_freq_replaced / h
            mixing_ratios[i] = 1 - mixing_ratio
            
        # Get indices for mixing labels
        indices = torch.remainder(torch.arange(batch_size) + 1, batch_size).to(device)
        y_a, y_b = batch_y, batch_y[indices]
        
        return mixed_x, y_a, y_b, mixing_ratios

# Custom CNN model
class CustomCNN(nn.Module):
    def __init__(self, num_classes=2):
        super(CustomCNN, self).__init__()
        # Input size: 3x496x200
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # 32x248x100
        
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # 64x124x50
        
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)  # 128x62x25
        
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(256)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)  # 256x31x12
        
        # Calculate flattened size
        self.flat_size = 256 * 31 * 12
        
        # Fully connected layers
        self.fc1 = nn.Linear(self.flat_size, 512)
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, 128)
        self.dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(128, num_classes)
        
    def forward(self, x):
        # Convolutional layers
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool4(x)
        
        # Flatten
        x = x.view(-1, self.flat_size)
        
        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)
        
        return x

# Training function with augmentation
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, device='cuda', 
                use_mixup=False, use_mixed_freq_mask=False):
    """
    Training function with support for different augmentation techniques
    """
    model.to(device)
    
    # Initialize mixup and mixed frequency masking if used
    mixup_fn = Mixup(alpha=0.2) if use_mixup else None
    mixed_freq_mask_fn = MixedFrequencyMasking(max_width=30, num_masks=1) if use_mixed_freq_mask else None
    
    best_acc = 0.0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
                
            running_loss = 0.0
            running_corrects = 0
            
            # Iterate over data
            for inputs, labels, _ in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    # Apply augmentation techniques during training
                    if phase == 'train':
                        if use_mixup:
                            inputs, labels_a, labels_b, lam = mixup_fn(inputs, labels)
                            outputs = model(inputs)
                            loss = lam * criterion(outputs, labels_a) + (1 - lam) * criterion(outputs, labels_b)
                        elif use_mixed_freq_mask:
                            inputs, labels_a, labels_b, mixing_ratios = mixed_freq_mask_fn(inputs, labels)
                            outputs = model(inputs)
                            # Apply per-sample mixing ratios
                            batch_loss = 0
                            for i in range(inputs.size(0)):
                                ratio = mixing_ratios[i]
                                batch_loss += ratio * criterion(outputs[i:i+1], labels_a[i:i+1]) + \
                                             (1 - ratio) * criterion(outputs[i:i+1], labels_b[i:i+1])
                            loss = batch_loss / inputs.size(0)
                        else:
                            outputs = model(inputs)
                            loss = criterion(outputs, labels)
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)
                    
                    _, preds = torch.max(outputs, 1)
                    
                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # Record history
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())
                
                # Save best model
                if epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), 'best_model.pth')
                    
    print(f'Best val Acc: {best_acc:.4f}')
    
    # Load best model weights
    model.load_state_dict(torch.load('best_model.pth'))
    return model, history

# Function to test the model
def test_model(model, test_loader, criterion, device='cuda'):
    model.to(device)
    model.eval()
    
    running_loss = 0.0
    running_corrects = 0
    
    # For confusion matrix
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels, _ in tqdm(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            _, preds = torch.max(outputs, 1)
            
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    test_loss = running_loss / len(test_loader.dataset)
    test_acc = running_corrects.double() / len(test_loader.dataset)
    
    print(f'Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}')
    
    return test_loss, test_acc, all_preds, all_labels

# Main execution function
def run_experiment(data_dir, batch_size=32, num_epochs=50, use_augmentation='mixed_freq_mask'):
    """
    Run the complete experiment
    Args:
        data_dir: Root directory of the MDVR dataset
        batch_size: Batch size for training
        num_epochs: Number of training epochs
        use_augmentation: Type of augmentation to use ('none', 'mixup', 'freq_mask', 'mixed_freq_mask')
    """
    # Define transforms
    # Base transform (no augmentation)
    
    # base_transform = transforms.Compose([
    #     transforms.Resize((496, 200)),
    #     transforms.ToTensor(),
    #     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    # ])

    base_transform = transforms.Compose([
        transforms.Resize((496, 200)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])  # Single channel normalization
    ])
    
    
    # Transform with frequency masking
    # freq_mask_transform = transforms.Compose([
    #     transforms.Resize((496, 200)),
    #     FrequencyMasking(max_width=30, num_masks=1),
    #     transforms.ToTensor(),
    #     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

    # ])

    freq_mask_transform = transforms.Compose([
        transforms.Resize((496, 200)),
        FrequencyMasking(max_width=30, num_masks=1),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])  # Single channel normalization
    ])
    
    # Choose the appropriate transform based on augmentation type
    if use_augmentation == 'freq_mask':
        train_transform = freq_mask_transform
    else:
        train_transform = base_transform
    
    # Create datasets
    train_dataset = MDVRDataset(data_dir, split='train', transform=train_transform)
    val_dataset = MDVRDataset(data_dir, split='val', transform=base_transform)
    test_dataset = MDVRDataset(data_dir, split='test', transform=base_transform)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    
    dataloaders = {
        'train': train_loader,
        'val': val_loader
    }
    
    # Create model, loss function, and optimizer
    model = CustomCNN(num_classes=2)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Set augmentation flags
    use_mixup = (use_augmentation == 'mixup')
    use_mixed_freq_mask = (use_augmentation == 'mixed_freq_mask')
    
    # Train model
    print(f"Training with augmentation: {use_augmentation}")
    model, history = train_model(
        model, dataloaders, criterion, optimizer, 
        num_epochs=num_epochs, device=device,
        use_mixup=use_mixup, use_mixed_freq_mask=use_mixed_freq_mask
    )
    
    # Test model
    test_loss, test_acc, all_preds, all_labels = test_model(model, test_loader, criterion, device)
    
    # Plot training history
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Training')
    plt.plot(history['val_loss'], label='Validation')
    plt.title('Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Training')
    plt.plot(history['val_acc'], label='Validation')
    plt.title('Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    
    return model, history, (test_loss, test_acc)

# Example usage
# To run the experiment:
model, history, test_results = run_experiment(
    data_dir="C:/NPersonal/Projects/SDP/Prediction Stuff/Dataset/MDVR",
    batch_size=32, 
    num_epochs=50,
    use_augmentation='mixed_freq_mask'  # or 'none', 'mixup', 'freq_mask'
)

In [3]:
import torch
print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("CuDNN version:", torch.backends.cudnn.version() if torch.cuda.is_available() else "No CUDA")

Torch version: 2.6.0+cu126
CUDA available: True
CUDA version: 12.6
CuDNN version: 90501


In [4]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("Current device:", torch.cuda.current_device())
print("GPU name:", torch.cuda.get_device_name(0))
print("Memory allocated:", torch.cuda.memory_allocated(0))
print("Memory reserved:", torch.cuda.memory_reserved(0))


CUDA available: True
Current device: 0
GPU name: NVIDIA GeForce GTX 1650
Memory allocated: 0
Memory reserved: 0


In [5]:
import os
from PIL import Image

def get_image_channels_pil(image_path):
    image = Image.open(image_path)
    mode = image.mode
    if mode == "L":
        return 1  # Grayscale
    elif mode == "RGB":
        return 3  # RGB
    elif mode == "RGBA":
        return 4  # RGBA
    else:
        return f"Unknown mode: {mode}"

# Specify the directory containing images
image_dir = "/home/nigmu/NPersonal/Projects/SDP/nigmu-parkinsons_disease_prediction/Dataset/Italian Spectrogram Dataset/HC/AGNESE P/B1APGANRET55F170320171104"

# List all files and pick the first image
image_files = [f for f in os.listdir(image_dir) if f.endswith(".png")]

if image_files:
    image_path = os.path.join(image_dir, image_files[0])  # Pick first image
    print("Checking:", image_path)
    print("Number of channels (PIL):", get_image_channels_pil(image_path))
else:
    print("No PNG images found in the directory.")


FileNotFoundError: [Errno 2] No such file or directory: '/home/nigmu/NPersonal/Projects/SDP/nigmu-parkinsons_disease_prediction/Dataset/Italian Spectrogram Dataset/HC/AGNESE P/B1APGANRET55F170320171104'