In [None]:
!ls /kaggle/input/chest-xray-14-dataset

In [None]:
pip install torch torchvision transformers huggingface_hub

In [None]:
import glob
import pandas as pd
import os
from PIL import Image
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from skimage import exposure 

# Load the CSV file
data = pd.read_csv('/kaggle/input/chest-xray-14-dataset/Data_Entry_2017.csv')
# Print the number of rows and columns in the DataFrame
print(f"Number of rows: {data.shape[0]}")
print(f"Number of columns: {data.shape[1]}")


# Load train and test image filenames
with open('/kaggle/input/chest-xray-14-dataset/train_val_list.txt') as f:
    train_files = set(f.read().splitlines())
    
with open('/kaggle/input/chest-xray-14-dataset/test_list.txt') as f:
    test_files = set(f.read().splitlines())

# Split the DataFrame into train and test sets based on filenames
train_data = data[data['Image Index'].isin(train_files)]
test_data = data[data['Image Index'].isin(test_files)]

# Process multi-labels for Finding Labels
train_data.loc[:, 'Finding Labels'] = train_data['Finding Labels'].str.split('|')
test_data.loc[:, 'Finding Labels'] = test_data['Finding Labels'].str.split('|')

mlb = MultiLabelBinarizer()
train_labels = mlb.fit_transform(train_data['Finding Labels'])
test_labels = mlb.transform(test_data['Finding Labels'])
classes = mlb.classes_

# Collect all image paths from nested directories
all_image_paths = {os.path.basename(x): x for x in glob.glob('/kaggle/input/chest-xray-14-dataset/images_*/images/*.png', recursive=True)}

def lung_focused_preprocessing(image_path):
    """Applies lung-focused preprocessing to an X-ray image."""
    # Load the grayscale image
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    
    # Step 1: Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    contrast_enhanced = clahe.apply(image)
    
    # Step 2: Suppress edges using smoothing (Gaussian Blur)
    blurred = cv2.GaussianBlur(contrast_enhanced, (5, 5), 0)
    
    # Step 3: Enhance lung focus with intensity normalization
    normalized = exposure.rescale_intensity(blurred, in_range=(50, 200), out_range=(0, 255)).astype(np.uint8)
    #normalized = cv2.normalize(blurred, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
    
    return normalized

class ChestXRayDataset(Dataset):
    def __init__(self, df, labels, image_paths, transform=None):
        self.df = df.reset_index(drop=True)
        self.labels = labels
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.loc[idx, 'Image Index']
        img_path = self.image_paths.get(img_name)
        
        if img_path is None:
            raise FileNotFoundError(f"Image {img_name} not found in specified directories.")
        
        # Apply lung-focused preprocessing
        preprocessed_image = lung_focused_preprocessing(img_path)
        
        # Convert to PIL image for compatibility with PyTorch transforms
        image = Image.fromarray(preprocessed_image).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        
        return image, label

# Define transformations (after preprocessing)
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Ensure all images are 224x224
    transforms.ToTensor(),
])

class_counts = train_labels.sum(axis=0)
print(pd.DataFrame({'Class': classes, 'Count': class_counts}))

label_counts = train_labels.sum(axis=0)
print(dict(zip(classes, label_counts)))

no_labels = np.sum(np.sum(train_labels, axis=1) == 0)
print(f"Samples with no labels: {no_labels}")


# Split the training data into 90% train and 10% validation
train_df, val_df, train_labels_split, val_labels_split = train_test_split(
    train_data, train_labels, test_size=0.1, random_state=42
)

# Initialize datasets and loaders
train_dataset = ChestXRayDataset(train_df, train_labels_split, all_image_paths, transform)
val_dataset = ChestXRayDataset(val_df, val_labels_split, all_image_paths, transform)
test_dataset = ChestXRayDataset(test_data, test_labels, all_image_paths, transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

print(f"Training set size: {len(train_dataset)}")
print(f"Validation set size: {len(val_dataset)}")
print(f"Test set size: {len(test_dataset)}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
import os
import zipfile
import shutil

# Define constants
num_epochs = 1
learning_rate = 0.001
batch_size = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
checkpoint_path = '/kaggle/working/model_trial_checkpoint.pth'  # Path to save the model checkpoint
zip_checkpoint_path = '/kaggle/working/model_trial_checkpoint.zip'  # Path to save the zip file

# Initialize model
def initialize_model(pretrained=False):
    model = create_model('swinv2_cr_small_224', pretrained=pretrained)
    
    # Custom forward function to flatten features
    def forward(self, x):
        x = self.forward_features(x)
        x = x.reshape(x.size(0), -1)  # Flatten to [batch_size, 50176] or compatible shape
        return self.head(x)

    model.forward = forward.__get__(model)  # Bind custom forward to the model instance
    
    # Replace the head to match the number of classes
    model.head = nn.Linear(model.head.in_features * 7 * 7, len(classes))
    
    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def save_checkpoint(model, optimizer, trial_num, path=checkpoint_path):
    """Save the model checkpoint and zip it."""
    # Save checkpoint
    torch.save({
        'trial': trial_num,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, path)
    print(f"Checkpoint saved at trial {trial_num + 1}")
    
    # Zip checkpoint
    with zipfile.ZipFile(zip_checkpoint_path, 'w') as zipf:
        zipf.write(path, os.path.basename(path))
    print(f"Checkpoint zipped at {zip_checkpoint_path}")

def load_checkpoint(model, optimizer, path=checkpoint_path):
    """Load the model checkpoint if it exists."""
    if os.path.isfile(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_trial = checkpoint['trial'] + 1
        print(f"Resuming training from trial {start_trial}")
        return start_trial
    else:
        print("No checkpoint found. Starting trials from scratch.")
        return 0

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=10):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=False)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        # Load checkpoint if available
        start_trial = load_checkpoint(model_scratch, optimizer_scratch, checkpoint_path)
        if trial_num < start_trial:
            continue  # Skip already completed trials
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        auc_scores_scratch.append(auc_scratch)
        
        # Save checkpoint after each trial
        save_checkpoint(model_scratch, optimizer_scratch, trial_num, checkpoint_path)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from internimage import internimage_t
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
import os
import zipfile
import shutil

# Define constants
num_epochs = 1
learning_rate = 0.001
batch_size = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
checkpoint_path = '/kaggle/working/model_trial_checkpoint.pth'  # Path to save the model checkpoint
zip_checkpoint_path = '/kaggle/working/model_trial_checkpoint.zip'  # Path to save the zip file

# Initialize ConvNeXt model
def initialize_model(pretrained=False):
    # Load ConvNeXt model
    model = create_model('convnext_tiny', pretrained=pretrained)

    # Custom forward function to flatten features
    def forward(self, x):
        x = self.forward_features(x)  # ConvNeXt uses forward_features
        x = x.reshape(x.size(0), -1)  # Flatten to [batch_size, num_features]
        return self.head(x)

    model.forward = forward.__get__(model)  # Bind custom forward to the model instance

    # Replace the head to match the number of classes
    num_features = model.head.in_features  # Get input features of the head
    model.head = nn.Linear(num_features, len(classes))  # Adjust for your classes

    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def save_checkpoint(model, optimizer, trial_num, path=checkpoint_path):
    """Save the model checkpoint and zip it."""
    # Save checkpoint
    torch.save({
        'trial': trial_num,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, path)
    print(f"Checkpoint saved at trial {trial_num + 1}")
    
    # Zip checkpoint
    with zipfile.ZipFile(zip_checkpoint_path, 'w') as zipf:
        zipf.write(path, os.path.basename(path))
    print(f"Checkpoint zipped at {zip_checkpoint_path}")

def load_checkpoint(model, optimizer, path=checkpoint_path):
    """Load the model checkpoint if it exists."""
    if os.path.isfile(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_trial = checkpoint['trial'] + 1
        print(f"Resuming training from trial {start_trial}")
        return start_trial
    else:
        print("No checkpoint found. Starting trials from scratch.")
        return 0

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=10):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=False)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        # Load checkpoint if available
        start_trial = load_checkpoint(model_scratch, optimizer_scratch, checkpoint_path)
        if trial_num < start_trial:
            continue  # Skip already completed trials
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        auc_scores_scratch.append(auc_scratch)
        
        # Save checkpoint after each trial
        save_checkpoint(model_scratch, optimizer_scratch, trial_num, checkpoint_path)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
import os
import zipfile
import shutil

# Define constants
num_epochs = 1
learning_rate = 0.001
batch_size = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
checkpoint_path = '/kaggle/working/model_trial_checkpoint.pth'  # Path to save the model checkpoint
zip_checkpoint_path = '/kaggle/working/model_trial_checkpoint.zip'  # Path to save the zip file

# Initialize ConvNeXt model
def initialize_model(pretrained=False):
    # Load ConvNeXt model
    model = create_model('convnext_tiny', pretrained=pretrained)

    # Custom forward function to flatten features
    def forward(self, x):
        x = self.forward_features(x)  # ConvNeXt uses forward_features
        x = x.reshape(x.size(0), -1)  # Flatten to [batch_size, num_features]
        return self.head(x)

    model.forward = forward.__get__(model)  # Bind custom forward to the model instance

    # Replace the head to match the number of classes
    num_features = model.head.in_features  # Get input features of the head
    print(num_features)
    model.head = nn.Linear(37632, len(classes))  # Adjust for your classes

    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def save_checkpoint(model, optimizer, trial_num, path=checkpoint_path):
    """Save the model checkpoint and zip it."""
    # Save checkpoint
    torch.save({
        'trial': trial_num,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, path)
    print(f"Checkpoint saved at trial {trial_num + 1}")
    
    # Zip checkpoint
    with zipfile.ZipFile(zip_checkpoint_path, 'w') as zipf:
        zipf.write(path, os.path.basename(path))
    print(f"Checkpoint zipped at {zip_checkpoint_path}")

def load_checkpoint(model, optimizer, path=checkpoint_path):
    """Load the model checkpoint if it exists."""
    if os.path.isfile(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_trial = checkpoint['trial'] + 1
        print(f"Resuming training from trial {start_trial}")
        return start_trial
    else:
        print("No checkpoint found. Starting trials from scratch.")
        return 0

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=10):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=False)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        # Load checkpoint if available
        start_trial = load_checkpoint(model_scratch, optimizer_scratch, checkpoint_path)
        if trial_num < start_trial:
            continue  # Skip already completed trials
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        auc_scores_scratch.append(auc_scratch)
        
        # Save checkpoint after each trial
        save_checkpoint(model_scratch, optimizer_scratch, trial_num, checkpoint_path)

    return auc_scores_scratch

def train_model_with_validation(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    """Train the model and validate after each epoch."""
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        # Print training loss
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
        
        # Validate model
        val_auc = evaluate_model(model, val_loader)
        print(f"Validation AUC after Epoch {epoch+1}: {val_auc:.4f}")

def run_trials_with_validation(trials=10, param_grid=None):
    """Run trials with hyperparameter optimization and validation."""
    if param_grid is None:
        # Define a grid of hyperparameters to try
        param_grid = {
            'learning_rate': [0.0005, 0.0001],
            'batch_size': [16, 32, 64],
            'optimizer': ['Adam', 'SGD', 'RMSprop'],
        }
    
    best_auc = 0.0
    best_params = {}
    results = []

    for learning_rate in param_grid['learning_rate']:
        for batch_size in param_grid['batch_size']:
            for optimizer_type in param_grid['optimizer']:
                print(f"\nTesting configuration: LR={learning_rate}, Batch Size={batch_size}, Optimizer={optimizer_type}")

                # Initialize DataLoader with the given batch size
                train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
                val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

                auc_scores = []  # AUC scores for this configuration

                for trial_num in range(trials):
                    # Initialize model
                    model = initialize_model(pretrained=False)

                    # Define optimizer
                    if optimizer_type == 'Adam':
                        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
                    elif optimizer_type == 'SGD':
                        optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
                    elif optimizer_type == 'RMSprop':
                        optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)
                    
                    # Load checkpoint if available
                    start_trial = load_checkpoint(model, optimizer, checkpoint_path)
                    if trial_num < start_trial:
                        continue  # Skip already completed trials
                    
                    # Train model with validation
                    train_model_with_validation(model, train_loader, val_loader, criterion, optimizer, num_epochs)

                    # Evaluate on validation set
                    val_auc = evaluate_model(model, val_loader)
                    auc_scores.append(val_auc)
                
                # Average AUC for this configuration
                avg_auc = np.mean(auc_scores)
                print(f"Configuration AUC: {avg_auc:.4f}")

                results.append({'learning_rate': learning_rate, 'batch_size': batch_size, 
                                'optimizer': optimizer_type, 'avg_auc': avg_auc})
                
                # Update the best configuration
                if avg_auc > best_auc:
                    best_auc = avg_auc
                    best_params = {'learning_rate': learning_rate, 
                                   'batch_size': batch_size, 
                                   'optimizer': optimizer_type}

    print("\nBest Configuration:", best_params)
    print("Best AUC:", best_auc)
    return results, best_params

# Execute the trials
param_grid = {
    'learning_rate': [0.0005, 0.0001],
    'batch_size': [16, 32, 64],
    'optimizer': ['Adam', 'SGD', 'RMSprop'],
}

auc_results, best_hyperparams = run_trials_with_validation(trials=1, param_grid=param_grid)
print("AUC Results for All Configurations:", auc_results)
print("Best Hyperparameters:", best_hyperparams)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
import os

# Define constants
num_epochs = 1
learning_rate = 0.001
batch_size = 32
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
checkpoint_path = '/kaggle/working/model_trial_checkpoint.pth'  # Path to save the model checkpoint

# Initialize model
def initialize_model(pretrained=False):
    model = create_model('swin_base_patch4_window7_224', pretrained=pretrained)
    
    # Custom forward function to flatten features
    def forward(self, x):
        x = self.forward_features(x)
        x = x.view(x.size(0), -1)  # Flatten to [batch_size, 50176] or compatible shape
        return self.head(x)

    model.forward = forward.__get__(model)  # Bind custom forward to the model instance
    
    # Replace the head to match the number of classes
    model.head = nn.Linear(model.head.in_features * 7 * 7, len(classes))
    
    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def save_checkpoint(model, optimizer, trial_num, path=checkpoint_path):
    """Save the model checkpoint."""
    torch.save({
        'trial': trial_num,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, path)
    print(f"Checkpoint saved at trial {trial_num + 1}")

def load_checkpoint(model, optimizer, path=checkpoint_path):
    """Load the model checkpoint if it exists."""
    if os.path.isfile(path):
        checkpoint = torch.load(path)
        model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_trial = checkpoint['trial'] + 1
        print(f"Resuming training from trial {start_trial}")
        return start_trial
    else:
        print("No checkpoint found. Starting trials from scratch.")
        return 0

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=10):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=False)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        # Load checkpoint if available
        start_trial = load_checkpoint(model_scratch, optimizer_scratch, checkpoint_path)
        if trial_num < start_trial:
            continue  # Skip already completed trials
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        auc_scores_scratch.append(auc_scratch)
        
        # Save checkpoint after each trial
        save_checkpoint(model_scratch, optimizer_scratch, trial_num, checkpoint_path)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


In [None]:
!pip install img2vec_pytorch

In [None]:
from img2vec_pytorch import Img2Vec
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from torchvision.transforms.functional import to_pil_image
import torch

# Initialize Img2Vec
img2vec = Img2Vec(cuda=torch.cuda.is_available(), model='resnet18')

# Function to extract features from dataset using Img2Vec
def extract_features_and_labels(data_loader):
    features = []
    labels = []

    for images, label_batch in data_loader:
        for i in range(images.size(0)):
            pil_image = to_pil_image(images[i])
            # Extract features for each image
            img_vector = img2vec.get_vec(pil_image)
            features.append(img_vector)
            labels.append(label_batch[i].numpy())

    return np.array(features), np.array(labels)

# Extract features and labels for train and validation datasets
print("Extracting features from training data...")
train_features, train_labels = extract_features_and_labels(train_loader)

print("Extracting features from validation data...")
val_features, val_labels = extract_features_and_labels(test_loader)

# If multi-label, convert one-hot encoded labels to class indices
train_labels = np.argmax(train_labels, axis=1)
val_labels = np.argmax(val_labels, axis=1)

# Initialize and train the Random Forest classifier
rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
print("Training Random Forest classifier...")
rf_classifier.fit(train_features, train_labels)

# Evaluate the model on validation data
val_predictions = rf_classifier.predict(val_features)

# Calculate and print accuracy and classification report
val_accuracy = accuracy_score(val_labels, val_predictions)
print("Validation Accuracy:", val_accuracy)
print("Classification Report:")
print(classification_report(val_labels, val_predictions, target_names=classes))


In [None]:
!pip install ultralytics

In [None]:
model = YOLO('yolov8n-cls.pt')
model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
from transformers import AutoModel, AutoConfig
#from torchsummary import summary
import types
from ultralytics import YOLO


# Define constants
num_epochs = 1
learning_rate = 0.0001
batch_size = 16
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
def initialize_model(pretrained=True):
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = YOLO('yolov8n-cls.pt')
    model.linear = nn.Linear(in_features=768, out_features=len(classes))
    #model.new_head = nn.Linear(768, len(classes))

# Update forward pass manually
    def forward_with_new_head(self, x):
        #print(f"Shape before all: {x.shape}")
        x = self.model(x)  # Original forward
        if isinstance(x, tuple):
            x = x[0]
        #print(f"Shape before new_head: {x.shape}")
        #x = self.new_head(x)  # New layer
        #print(f"Shape after new_head: {x.shape}")
        return x

# Bind the new forward function to the model
    
    model.forward = types.MethodType(forward_with_new_head, model)
    
    #print(model)
    #summary(model, input_size=(3,224,224))
    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            #print('images.shape is:', images.shape)
            outputs = model(images)
            if isinstance(outputs, tuple):  # Some models may return multiple outputs
                outputs = outputs[0]  # Extract the logits
            #print('outputs1.shape is:', outputs.shape)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]
            #print('outputs.shape is:', outputs.shape)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=1):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=True)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        print('auc_scratch is:', auc_scratch)
        auc_scores_scratch.append(auc_scratch)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


**just fine tune fc**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F

# Define constants
num_epochs = 1
learning_rate = 0.0001
batch_size = 16
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
def initialize_model(pretrained=True):
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = create_model('convnext_tiny', pretrained=True)
    model.head.fc = nn.Linear(in_features=768, out_features=len(classes))
    model.to(device)

    for name, param in model.named_parameters():
        if "fc" not in name:
            param.requires_grad_(False)
            #print(param)

    #summary(model, input_size=(3,224,224))

    return model.to(device)


# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=4):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=True)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        print('auc_scratch is:', auc_scratch)
        auc_scores_scratch.append(auc_scratch)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


**full fine tuning**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F

# Define constants
num_epochs = 1
learning_rate = 0.0001
batch_size = 16
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
def initialize_model(pretrained=True):
    model = create_model('convnext_tiny', pretrained=pretrained)
    
    # Custom forward function to flatten features
    def forward(self, x):
        x = self.forward_features(x)
        x = x.reshape(x.size(0), -1)  # Flatten to [batch_size, 50176] or compatible shape
        return self.head(x)

    model.forward = forward.__get__(model)  # Bind custom forward to the model instance
    
    # Replace the head to match the number of classes
    model.head = nn.Linear(model.head.in_features * 7 * 7, len(classes))
    
    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]

            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=10):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=True)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        print('auc_scratch is:', auc_scratch)
        auc_scores_scratch.append(auc_scratch)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)


**combination of CNN and transfomer**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from sklearn.metrics import roc_auc_score

class LowLevelFeatureExtractor(nn.Module):
    def __init__(self, in_channels=3):
        super(LowLevelFeatureExtractor, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def forward(self, x):
        # Low-level features pipeline
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        return x

class ConvNeXtHighLevelFeatures(nn.Module):
    def __init__(self):
        super(ConvNeXtHighLevelFeatures, self).__init__()
        self.convnext = models.convnext_tiny(pretrained=True)
        # Remove the classifier head
        self.features = self.convnext.features

    def forward(self, x):
        # High-level features pipeline
        x = self.features(x)
        return x

class LungDiseaseClassifier(nn.Module):
    def __init__(self, num_classes):
        super(LungDiseaseClassifier, self).__init__()
        # Initialize low-level and high-level feature extractors
        self.low_level_extractor = LowLevelFeatureExtractor(in_channels=3)
        self.high_level_extractor = ConvNeXtHighLevelFeatures()
        
        # Feature fusion and classification layers
        self.fusion_conv = nn.Conv2d(128 + 768, 512, kernel_size=1)  # 128 from low-level, 768 from ConvNeXt
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(flattened_feature_size, 1024),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        # Step 1: Extract low-level features
        low_level_features = self.low_level_extractor(x)  # Shape: [batch, 128, H/8, W/8]
        
        # Step 2: Extract high-level features
        high_level_features = self.high_level_extractor(x)  # Shape: [batch, 768, H/32, W/32]
        high_level_features = F.interpolate(high_level_features, size=low_level_features.shape[2:], mode='bilinear', align_corners=False)
        
        # Step 3: Fuse features
        fused_features = torch.cat([low_level_features, high_level_features], dim=1)  # Concatenate along channel dimension
        fused_features = self.fusion_conv(fused_features)  # Shape: [batch, 512, H/8, W/8]
        
        # Step 4: Classification
        output = self.classifier(fused_features)
        return output

# ------------------------- Model Initialization -------------------------
# Number of classes for the classification task
num_classes = len(classes)  # Replace `classes` with the number of disease labels

# Initialize the model
model = LungDiseaseClassifier(num_classes)

# ------------------------- Loss and Optimizer -------------------------
criterion = nn.BCEWithLogitsLoss()  # Binary Cross Entropy for multi-label classification
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# ------------------------- Training Loop -------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Example training loop

# Training loop with AUC computation
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    # Training phase
    for images, labels in train_loader:  # Ensure `train_loader` is defined
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)  # Forward pass
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights
        
        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {total_loss / len(train_loader):.4f}")
    
    # Evaluation phase
    model.eval()
    all_targets = []
    all_predictions = []
    
    with torch.no_grad():
        for images, labels in test_loader:  # Ensure `test_loader` is defined
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)  # Get model predictions
            
            # Store predictions and true labels
            all_predictions.append(torch.sigmoid(outputs).cpu().numpy())  # Convert logits to probabilities
            all_targets.append(labels.cpu().numpy())
    
    # Flatten lists to compute AUC
    all_targets = np.vstack(all_targets)  # Shape: [num_samples, num_classes]
    all_predictions = np.vstack(all_predictions)  # Shape: [num_samples, num_classes]
    
    # Compute AUC for each class
    aucs = []
    for i in range(num_classes):  # Replace `num_classes` with the number of output classes
        try:
            auc = roc_auc_score(all_targets[:, i], all_predictions[:, i])
        except ValueError:
            auc = None  # Handle edge case where a class has only one label
        aucs.append(auc)
    
    # Compute average AUC (excluding None values)
    valid_aucs = [auc for auc in aucs if auc is not None]
    mean_auc = sum(valid_aucs) / len(valid_aucs) if valid_aucs else 0
    
    print(f"Epoch {epoch+1}/{num_epochs}, Test AUC (per class): {aucs}")
    print(f"Epoch {epoch+1}/{num_epochs}, Mean Test AUC: {mean_auc:.4f}")

**using Interimage**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import roc_auc_score
from timm import create_model
from tqdm import tqdm
import torch.nn.functional as F
from transformers import AutoModel, AutoConfig
#from torchsummary import summary
import types


# Define constants
num_epochs = 1
learning_rate = 0.0001
batch_size = 16
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
def initialize_model(pretrained=True):
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    my_config = AutoConfig.from_pretrained("OpenGVLab/internimage_t_1k_224", torch_dtype=torch.float16, trust_remote_code=True)
    model = AutoModel.from_config(my_config)
    model.model.head = nn.Linear(in_features=768, out_features=len(classes))
    #model.new_head = nn.Linear(768, len(classes))

# Update forward pass manually
    def forward_with_new_head(self, x):
        #print(f"Shape before all: {x.shape}")
        x = self.model(x)  # Original forward
        if isinstance(x, tuple):
            x = x[0]
        #print(f"Shape before new_head: {x.shape}")
        #x = self.new_head(x)  # New layer
        #print(f"Shape after new_head: {x.shape}")
        return x

# Bind the new forward function to the model
    
    model.forward = types.MethodType(forward_with_new_head, model)
    
    #print(model)
    #summary(model, input_size=(3,224,224))
    return model.to(device)

# Loss function and optimizer
criterion = nn.BCEWithLogitsLoss()  # For multi-label classification

def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            #print('images.shape is:', images.shape)
            outputs = model(images)
            if isinstance(outputs, tuple):  # Some models may return multiple outputs
                outputs = outputs[0]  # Extract the logits
            #print('outputs1.shape is:', outputs.shape)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten to [batch_size, num_classes]
            #print('outputs.shape is:', outputs.shape)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# Function to evaluate the model
def evaluate_model(model, test_loader):
    model.eval()
    all_labels = []
    all_outputs = []
    
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            outputs = model(images)
            outputs = outputs.view(outputs.size(0), -1)  # Flatten outputs
            all_labels.append(labels.numpy())
            all_outputs.append(torch.sigmoid(outputs).cpu().numpy())
    
    all_labels = np.vstack(all_labels)
    all_outputs = np.vstack(all_outputs)
    
    return roc_auc_score(all_labels, all_outputs, average='macro')

# Run independent trials for both training approaches
def run_trials(trials=1):
    auc_scores_scratch = []
    
    for trial_num in range(trials):
        # Training from scratch
        model_scratch = initialize_model(pretrained=True)
        optimizer_scratch = optim.Adam(model_scratch.parameters(), lr=learning_rate)
        
        train_model(model_scratch, train_loader, criterion, optimizer_scratch, num_epochs)
        auc_scratch = evaluate_model(model_scratch, test_loader)
        print('auc_scratch is:', auc_scratch)
        auc_scores_scratch.append(auc_scratch)

    return auc_scores_scratch

# Execute the trials
auc_scratch = run_trials()
print("AUC Scores (Training from Scratch):", auc_scratch)
