In [2]:
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [3]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split, WeightedRandomSampler
from torchvision import transforms, models
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_curve, auc
import seaborn as sns
from google.colab import drive
from PIL import Image
import matplotlib.pyplot as plt
import gradio as gr
from collections import Counter
import optuna

# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

# Define paths
base_path = '/content/drive/MyDrive/XVI/Datas'
metadata_path = '/path/to/Data_Entry_2017.csv'  # Update this path accordingly

# Define new classes from the ChestX-ray8 dataset
classes = [
    'Atelectasis', 'Cardiomegaly', 'Effusion', 'Infiltration', 'Mass', 'Nodule',
    'Pneumonia', 'Pneumothorax', 'Consolidation', 'Edema', 'Emphysema',
    'Fibrosis', 'Pleural_Thickening', 'Hernia'
]

# Load metadata
metadata = pd.read_csv(metadata_path)

# Create a mapping from image index to the label
label_mapping = {row['Image Index']: row['Finding Labels'].split('|') for idx, row in metadata.iterrows()}

# Convert labels to numeric classes
all_labels = set()
for labels in label_mapping.values():
    all_labels.update(labels)
label_to_idx = {label: idx for idx, label in enumerate(sorted(all_labels))}

# Create image paths and labels
image_paths = []
labels = []

for image_name, diseases in label_mapping.items():
    image_path = os.path.join(base_path, image_name)
    if os.path.exists(image_path):
        image_paths.append(image_path)
        label_vector = [0] * len(label_to_idx)
        for disease in diseases:
            if disease in label_to_idx:
                label_vector[label_to_idx[disease]] = 1
        labels.append(label_vector)

# Custom Dataset class
class CustomImageDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        if self.transform:
            image = self.transform(image=np.array(image))['image']
        return image, label

# Define transforms with data augmentation using Albumentations
import albumentations as A
from albumentations.pytorch import ToTensorV2

transform = A.Compose([
    A.Resize(150, 150),
    A.HorizontalFlip(),
    A.VerticalFlip(),
    A.RandomRotate90(),
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    A.OneOf([
        A.GaussianBlur(),
        A.GaussNoise(),
        A.MotionBlur()
    ], p=0.3),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
])

# Create the dataset
dataset = CustomImageDataset(image_paths, labels, transform=transform)

# Split the data into training, devset, and test sets
partitions = [0.8, 0.1, 0.1]

train_size = int(partitions[0] * len(dataset))
test_dev_size = len(dataset) - train_size
dev_size = int(partitions[1] * len(dataset))
test_size = test_dev_size - dev_size

train_data, test_dev_data = random_split(dataset, [train_size, test_dev_size])
dev_data, test_data = random_split(test_dev_data, [dev_size, test_size])

# Calculate class weights
class_counts = Counter([label.argmax() for label in labels])
total_samples = sum(class_counts.values())
class_weights = {cls: total_samples / class_counts.get(cls, 1) for cls in range(len(classes))}

# Debugging: Print class weights and class counts
print(f"class_counts: {class_counts}")
print(f"class_weights: {class_weights}")

sample_weights = [class_weights[label.argmax()] for label in labels]

# Create samplers
train_sampler = WeightedRandomSampler(weights=sample_weights[:train_size], num_samples=train_size, replacement=True)
train_loader = DataLoader(train_data, batch_size=32, sampler=train_sampler)
dev_loader = DataLoader(dev_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

# Using a pre-trained ResNet50 model with additional layers and batch normalization
class FineTunedResNet(nn.Module):
    def __init__(self):
        super(FineTunedResNet, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)  # Updated for weights

        # Replace the fully connected layer with more layers and batch normalization
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 1024),  # First additional layer
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),  # Second additional layer
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),  # Third additional layer
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, len(classes))  # Output layer, Softmax will be applied in the loss function
        )

    def forward(self, x):
        return self.resnet(x)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Get the best hyperparameters from Optuna study
best_params = {'lr': 0.00028877715384763937, 'weight_decay': 0.0010977517310350644}
print('Best hyperparameters:', best_params)

# Train the final model with the best hyperparameters
model = FineTunedResNet().to(device)
class_weights_tensor = torch.tensor([class_weights[i] for i in range(len(classes))], device=device)
criterion = nn.BCEWithLogitsLoss(weight=class_weights_tensor)
optimizer = optim.Adam(model.parameters(), lr=best_params['lr'], weight_decay=best_params['weight_decay'])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# Function to train and validate the model with early stopping
def train_validate_early_stopping(model, train_loader, dev_loader, criterion, optimizer, scheduler, num_epochs, save_path, patience):
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    best_val_acc = 0.0
    best_epoch = 0
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            preds = (outputs > 0.5).float()
            correct += (preds == labels).sum().item()
            total += labels.numel()

        train_losses.append(running_loss / len(train_loader))
        train_accuracies.append(correct / total)

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for images, labels in dev_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = (outputs > 0.5).float()
                correct += (preds == labels).sum().item()
                total += labels.numel()

        val_losses.append(val_loss / len(dev_loader))
        val_accuracies.append(correct / total)

        # Save the best model based on validation accuracy
        if val_accuracies[-1] > best_val_acc:
            best_val_acc = val_accuracies[-1]
            best_epoch = epoch
            patience_counter = 0
            torch.save(model.state_dict(), save_path)
        else:
            patience_counter += 1

        # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

        # Step the scheduler
        scheduler.step()

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss/len(dev_loader)}, Train Acc: {train_accuracies[-1]}, Val Acc: {val_accuracies[-1]}')

    print(f"Best Validation Accuracy: {best_val_acc} at epoch


SyntaxError: unterminated string literal (detected at line 235) (<ipython-input-3-f5937326123f>, line 235)