# Blood-Brain Barrier Permeability Prediction using Artificial Neural Networks

This notebook implements an ANN model for predicting blood-brain barrier (BBB) permeability with:
- **Hyperparameter Tuning**: Number of layers, neurons per layer, learning rate, batch size
- **Wandb Integration**: For visualisation of results and comparison of different models
- **Feature Engineering**: Molecular descriptors + MACCS fingerprints as input to model
- **Dataset**: B3DB dataset

## Table of Contents
1. Import Libraries
2. Load and Explore Data
3. Feature Extraction
4. Data Preprocessing
5. ANN Model Architecture
6. Hyperparameter Tuning with Wandb
7. Model Training
8. Evaluation and Results

## 1. Import Required Libraries

In [None]:
# Data manipulation and visualization
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# RDKit for molecular features
from rdkit import Chem
from rdkit.Chem import Descriptors, Lipinski, Crippen, MACCSkeys

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

# Scikit-learn
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, 
                             classification_report, confusion_matrix, roc_curve, auc,
                             roc_auc_score, matthews_corrcoef)

# Wandb for experiment tracking
import wandb

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Set style for plots
sns.set_style("whitegrid")
plt.style.use('seaborn-v0_8-darkgrid')

## 2. Load and Explore Data

In [None]:
# Load the dataset
data_path = Path('../data/BBBP.csv')
df = pd.read_csv(data_path)

print(f"Dataset shape: {df.shape}")
print(f"\nFirst few rows:")
print(df.head())

print(f"\nDataset info:")
print(df.info())

print(f"\nTarget distribution:")
print(df['p_np'].value_counts())
print(f"\nClass balance:")
print(df['p_np'].value_counts(normalize=True))

# Check for missing values
print(f"\nMissing values:")
print(df.isnull().sum())

## 3. Feature Extraction

We'll extract two types of features:
1. **Molecular Descriptors**: Physical and chemical properties (12 descriptors)
2. **MACCS Fingerprints**: 167-bit structural fingerprints

Total feature dimension: 12 + 167 = 179 features

In [None]:
def extract_molecular_descriptors(smiles):
    """Extract molecular descriptors from SMILES string"""
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    
    descriptors = {
        'MolWt': Descriptors.MolWt(mol),
        'LogP': Crippen.MolLogP(mol),
        'NumHDonors': Lipinski.NumHDonors(mol),
        'NumHAcceptors': Lipinski.NumHAcceptors(mol),
        'TPSA': Descriptors.TPSA(mol),
        'NumRotatableBonds': Lipinski.NumRotatableBonds(mol),
        'NumAromaticRings': Lipinski.NumAromaticRings(mol),
        'NumHeteroatoms': Lipinski.NumHeteroatoms(mol),
        'NumRings': Lipinski.RingCount(mol),
        'FractionCsp3': Lipinski.FractionCSP3(mol),
        'NumSaturatedRings': Lipinski.NumSaturatedRings(mol),
        'NumAliphaticRings': Lipinski.NumAliphaticRings(mol),
    }
    return descriptors

def extract_maccs_fingerprint(smiles):
    """Extract MACCS fingerprint from SMILES string"""
    mol = Chem.MolFromSmiles(smiles)
    if mol is None:
        return None
    
    maccs = MACCSkeys.GenMACCSKeys(mol)
    return np.array(maccs)

def extract_features(smiles):
    """Extract combined features: molecular descriptors + MACCS fingerprints"""
    # Extract descriptors
    descriptors = extract_molecular_descriptors(smiles)
    if descriptors is None:
        return None
    
    # Extract MACCS fingerprints
    maccs = extract_maccs_fingerprint(smiles)
    if maccs is None:
        return None
    
    # Combine features
    descriptor_values = list(descriptors.values())
    combined_features = np.concatenate([descriptor_values, maccs])
    
    return combined_features

print("Feature extraction functions defined successfully!")

In [None]:
# Extract features for all molecules
print("Extracting features from SMILES strings...")
features_list = []
valid_indices = []

for idx, smiles in enumerate(df['SMILES']):
    features = extract_features(smiles)
    if features is not None:
        features_list.append(features)
        valid_indices.append(idx)
    
    if (idx + 1) % 500 == 0:
        print(f"Processed {idx + 1}/{len(df)} molecules...")

# Convert to numpy array
X = np.array(features_list)
y = df.loc[valid_indices, 'p_np'].values

print(f"\nFeature extraction complete!")
print(f"Total samples: {len(X)}")
print(f"Feature dimension: {X.shape[1]}")
print(f"Valid molecules: {len(valid_indices)}/{len(df)}")
print(f"\nFeature matrix shape: {X.shape}")
print(f"Target vector shape: {y.shape}")

## 4. Data Preprocessing

Split the data into training and test sets, then standardize features.

In [None]:
# Split data into train and test sets (80-20 split)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set size: {len(X_train)}")
print(f"Test set size: {len(X_test)}")
print(f"\nTraining set class distribution:")
print(pd.Series(y_train).value_counts())
print(f"\nTest set class distribution:")
print(pd.Series(y_test).value_counts())

# Standardize features (fit on training data only)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"\nData preprocessing complete!")
print(f"Scaled training data shape: {X_train_scaled.shape}")
print(f"Scaled test data shape: {X_test_scaled.shape}")

## 5. ANN Model Architecture

Define a flexible neural network architecture that supports variable number of layers and neurons.

In [None]:
class BBBPredictorANN(nn.Module):
    """
    Flexible ANN architecture for BBB permeability prediction
    
    Parameters:
    -----------
    input_size : int
        Dimension of input features
    hidden_layers : list of int
        Number of neurons in each hidden layer
    dropout_rate : float
        Dropout rate for regularization
    """
    def __init__(self, input_size, hidden_layers, dropout_rate=0.3):
        super(BBBPredictorANN, self).__init__()
        
        layers = []
        prev_size = input_size
        
        # Build hidden layers
        for hidden_size in hidden_layers:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.BatchNorm1d(hidden_size))
            layers.append(nn.Dropout(dropout_rate))
            prev_size = hidden_size
        
        # Output layer (binary classification)
        layers.append(nn.Linear(prev_size, 1))
        layers.append(nn.Sigmoid())
        
        self.network = nn.Sequential(*layers)
    
    def forward(self, x):
        return self.network(x)

print("ANN model class defined successfully!")

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    """Train the ANN model and track performance"""
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    best_val_acc = 0.0
    best_model_state = None
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            # Forward pass
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y.unsqueeze(1))
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Calculate accuracy
            predictions = (outputs > 0.5).float()
            train_correct += (predictions == batch_y.unsqueeze(1)).sum().item()
            train_total += batch_y.size(0)
            train_loss += loss.item()
        
        # Calculate average training loss and accuracy
        avg_train_loss = train_loss / len(train_loader)
        train_acc = train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y.unsqueeze(1))
                
                predictions = (outputs > 0.5).float()
                val_correct += (predictions == batch_y.unsqueeze(1)).sum().item()
                val_total += batch_y.size(0)
                val_loss += loss.item()
        
        # Calculate average validation loss and accuracy
        avg_val_loss = val_loss / len(val_loader)
        val_acc = val_correct / val_total
        
        # Store metrics
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)
        
        # Log to wandb
        wandb.log({
            'epoch': epoch + 1,
            'train_loss': avg_train_loss,
            'val_loss': avg_val_loss,
            'train_accuracy': train_acc,
            'val_accuracy': val_acc
        })
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
        
        # Print progress
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}] - "
                  f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f} - "
                  f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Load best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return model, {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_accs': train_accs,
        'val_accs': val_accs,
        'best_val_acc': best_val_acc
    }

def evaluate_model(model, test_loader, device):
    """Evaluate the model on test set"""
    model.eval()
    all_predictions = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            outputs = model(batch_X)
            predictions = (outputs > 0.5).float()
            
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())
            all_probs.extend(outputs.cpu().numpy())
    
    all_predictions = np.array(all_predictions).flatten()
    all_labels = np.array(all_labels).flatten()
    all_probs = np.array(all_probs).flatten()
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions)
    recall = recall_score(all_labels, all_predictions)
    f1 = f1_score(all_labels, all_predictions)
    roc_auc = roc_auc_score(all_labels, all_probs)
    mcc = matthews_corrcoef(all_labels, all_predictions)
    
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'roc_auc': roc_auc,
        'mcc': mcc
    }
    
    return metrics, all_predictions, all_labels, all_probs

print("Training and evaluation functions defined successfully!")

## 6. Hyperparameter Tuning with Wandb

We'll tune the following hyperparameters:
- **Number of hidden layers**: 2, 3, 4
- **Neurons per layer**: 64, 128, 256
- **Learning rate**: 0.0001, 0.001, 0.01
- **Batch size**: 16, 32, 64
- **Dropout rate**: 0.2, 0.3, 0.4

In [None]:
# Define hyperparameter search space
hyperparameter_configs = [
    # Configuration 1: 2 layers, 128 neurons
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 2: 3 layers, 128 neurons
    {'num_layers': 3, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 3: 2 layers, 256 neurons
    {'num_layers': 2, 'neurons': 256, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 4: 3 layers, 64 neurons
    {'num_layers': 3, 'neurons': 64, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 5: 2 layers, 128 neurons, higher lr
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.01, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 6: 2 layers, 128 neurons, lower lr
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.0001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 7: 2 layers, 128 neurons, larger batch
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 64, 'dropout': 0.3},
    # Configuration 8: 2 layers, 128 neurons, smaller batch
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 16, 'dropout': 0.3},
    # Configuration 9: 3 layers, 256 neurons
    {'num_layers': 3, 'neurons': 256, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 10: 4 layers, 128 neurons
    {'num_layers': 4, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.3},
    # Configuration 11: 2 layers, 128 neurons, higher dropout
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.4},
    # Configuration 12: 2 layers, 128 neurons, lower dropout
    {'num_layers': 2, 'neurons': 128, 'learning_rate': 0.001, 'batch_size': 32, 'dropout': 0.2},
]

print(f"Total hyperparameter configurations: {len(hyperparameter_configs)}")
print("\nSample configurations:")
for i, config in enumerate(hyperparameter_configs[:3]):
    print(f"Config {i+1}: {config}")

In [None]:
def run_experiment(config, X_train, y_train, X_test, y_test, input_size, num_epochs=100):
    """
    Run a single experiment with given hyperparameters
    
    Parameters:
    -----------
    config : dict
        Hyperparameter configuration
    X_train, y_train : numpy arrays
        Training data
    X_test, y_test : numpy arrays
        Test data
    input_size : int
        Input feature dimension
    num_epochs : int
        Number of training epochs
    
    Returns:
    --------
    metrics : dict
        Test set performance metrics
    """
    # Initialize wandb run
    run = wandb.init(
        project="bbb-permeability-ann",
        config=config,
        reinit=True
    )
    
    # Split training data into train and validation sets
    X_tr, X_val, y_tr, y_val = train_test_split(
        X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
    )
    
    # Create data loaders
    train_dataset = TensorDataset(
        torch.FloatTensor(X_tr),
        torch.FloatTensor(y_tr)
    )
    val_dataset = TensorDataset(
        torch.FloatTensor(X_val),
        torch.FloatTensor(y_val)
    )
    test_dataset = TensorDataset(
        torch.FloatTensor(X_test),
        torch.FloatTensor(y_test)
    )
    
    train_loader = DataLoader(train_dataset, batch_size=config['batch_size'], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=config['batch_size'], shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=config['batch_size'], shuffle=False)
    
    # Create model
    hidden_layers = [config['neurons']] * config['num_layers']
    model = BBBPredictorANN(
        input_size=input_size,
        hidden_layers=hidden_layers,
        dropout_rate=config['dropout']
    ).to(device)
    
    # Define loss function and optimizer
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=config['learning_rate'])
    
    # Train model
    print(f"\nTraining with config: {config}")
    model, history = train_model(
        model, train_loader, val_loader, criterion, optimizer, num_epochs, device
    )
    
    # Evaluate on test set
    metrics, predictions, labels, probs = evaluate_model(model, test_loader, device)
    
    # Log final metrics to wandb
    wandb.log({
        'test_accuracy': metrics['accuracy'],
        'test_precision': metrics['precision'],
        'test_recall': metrics['recall'],
        'test_f1_score': metrics['f1_score'],
        'test_roc_auc': metrics['roc_auc'],
        'test_mcc': metrics['mcc']
    })
    
    # Log confusion matrix
    cm = confusion_matrix(labels, predictions)
    wandb.log({
        "confusion_matrix": wandb.plot.confusion_matrix(
            probs=None,
            y_true=labels.astype(int),
            preds=predictions.astype(int),
            class_names=["Non-permeable", "Permeable"]
        )
    })
    
    print(f"Test Accuracy: {metrics['accuracy']:.4f}")
    print(f"Test F1-Score: {metrics['f1_score']:.4f}")
    print(f"Test ROC-AUC: {metrics['roc_auc']:.4f}")
    
    run.finish()
    
    return metrics, model, history

print("Experiment function defined successfully!")

## 7. Run Hyperparameter Tuning Experiments

Now we'll run all hyperparameter configurations and track results with Wandb.

In [None]:
# Run hyperparameter tuning experiments
# Note: Set to smaller number of configs for faster testing
# To run all configs, use: configs_to_run = hyperparameter_configs

# For demonstration, we'll run first 3 configs (you can change this)
configs_to_run = hyperparameter_configs[:3]  # Change to hyperparameter_configs for all

input_size = X_train_scaled.shape[1]
num_epochs = 100

results = []
best_f1 = 0
best_config = None
best_model = None

print("Starting hyperparameter tuning...")
print(f"Total configurations to test: {len(configs_to_run)}\n")

for i, config in enumerate(configs_to_run):
    print(f"\n{'='*60}")
    print(f"Experiment {i+1}/{len(configs_to_run)}")
    print(f"{'='*60}")
    
    metrics, model, history = run_experiment(
        config=config,
        X_train=X_train_scaled,
        y_train=y_train,
        X_test=X_test_scaled,
        y_test=y_test,
        input_size=input_size,
        num_epochs=num_epochs
    )
    
    # Store results
    result = {**config, **metrics}
    results.append(result)
    
    # Track best model
    if metrics['f1_score'] > best_f1:
        best_f1 = metrics['f1_score']
        best_config = config
        best_model = model
    
    print(f"\nCurrent best F1-score: {best_f1:.4f}")

print(f"\n{'='*60}")
print("Hyperparameter tuning complete!")
print(f"{'='*60}")

## 8. Results Analysis

Let's analyze the results from all experiments.

In [None]:
# Create results DataFrame
results_df = pd.DataFrame(results)

# Sort by F1-score
results_df_sorted = results_df.sort_values('f1_score', ascending=False)

print("Top 5 configurations by F1-score:")
print(results_df_sorted.head())

print(f"\n\nBest Configuration:")
print(f"{'='*60}")
for key, value in best_config.items():
    print(f"{key:20s}: {value}")
print(f"{'='*60}")
print(f"Best F1-score: {best_f1:.4f}")

# Save results to CSV
results_path = Path('../figures/BBBP/')
results_path.mkdir(parents=True, exist_ok=True)
results_df_sorted.to_csv(results_path / 'ann_hyperparameter_results.csv', index=False)
print(f"\nResults saved to {results_path / 'ann_hyperparameter_results.csv'}")