In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader


In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

In [9]:
class ProteinCNN(nn.Module):
    """
    Convolutional Neural Network for protein sequence analysis.
    
    This network uses multiple convolutional layers with different filter sizes
    to capture local patterns in protein sequences, followed by fully connected
    layers to make the final prediction.
    """
    
    def __init__(self, num_amino_acids=20, num_filters=128, dropout_rate=0.5):
        """
        Initialize the CNN architecture.
        
        Args:
            num_amino_acids: Size of amino acid vocabulary (20 for standard amino acids)
            num_filters: Number of convolutional filters for each filter size
            dropout_rate: Dropout probability for regularization
        """
        super(ProteinCNN, self).__init__()
        
        # Multiple convolutional layers with different filter sizes
        # This allows us to capture patterns of different lengths
        self.conv1 = nn.Conv1d(num_amino_acids, num_filters, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(num_amino_acids, num_filters, kernel_size=5, padding=2)
        self.conv3 = nn.Conv1d(num_amino_acids, num_filters, kernel_size=7, padding=3)
        
        # Batch normalization helps with training stability
        self.batch_norm1 = nn.BatchNorm1d(num_filters)
        self.batch_norm2 = nn.BatchNorm1d(num_filters)
        self.batch_norm3 = nn.BatchNorm1d(num_filters)
        
        # Additional convolutional layers for more complex pattern detection
        self.conv4 = nn.Conv1d(num_filters * 3, num_filters * 2, kernel_size=3, padding=1)
        self.batch_norm4 = nn.BatchNorm1d(num_filters * 2)
        
        self.conv5 = nn.Conv1d(num_filters * 2, num_filters, kernel_size=3, padding=1)
        self.batch_norm5 = nn.BatchNorm1d(num_filters)
        
        # Global pooling to get a fixed-size representation
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        
        # Fully connected layers for final prediction
        self.fc1 = nn.Linear(num_filters, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 1)  # Single output for regression
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout_rate)
        
    def forward(self, x):
        """
        Forward pass through the network.
        
        Args:
            x: Input tensor of shape [batch_size, sequence_length, num_amino_acids]
            
        Returns:
            predictions: Tensor of shape [batch_size, 1] with predicted values
        """
        # Transpose for conv1d: [batch_size, num_amino_acids, sequence_length]
        x = x.transpose(1, 2)
        
        # Apply multiple convolutional layers with different filter sizes
        conv1_out = F.relu(self.batch_norm1(self.conv1(x)))
        conv2_out = F.relu(self.batch_norm2(self.conv2(x)))
        conv3_out = F.relu(self.batch_norm3(self.conv3(x)))
        
        # Concatenate outputs from different filter sizes
        # This gives us a rich representation that captures patterns of various lengths
        x = torch.cat([conv1_out, conv2_out, conv3_out], dim=1)
        
        # Apply additional convolutional layers
        x = F.relu(self.batch_norm4(self.conv4(x)))
        x = self.dropout(x)
        
        x = F.relu(self.batch_norm5(self.conv5(x)))
        x = self.dropout(x)
        
        # Global pooling to get a fixed-size representation regardless of sequence length
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)  # Flatten
        
        # Fully connected layers for final prediction
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        
        # Final prediction (no activation for regression)
        x = self.fc3(x)
        
        return x

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, r2_score
import time

# Set random seeds for reproducibility
# This ensures that your results are consistent across runs
torch.manual_seed(42)
np.random.seed(42)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Training on: {device}")

In [None]:
def train_cnn_model(model, train_loader, val_loader, num_epochs=100, learning_rate=0.001):
    """
    Train the CNN model with careful monitoring and early stopping.
    
    This function implements best practices for training neural networks on small datasets,
    including learning rate scheduling, early stopping, and comprehensive monitoring.
    """
    
    # Move model to the appropriate device (GPU if available)
    model = model.to(device)
    
    # Choose optimizer and loss function
    # Adam is generally a good choice for CNNs because it adapts the learning rate
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    
    # Mean Squared Error for regression tasks
    criterion = nn.MSELoss()
    
    # Learning rate scheduler - reduces learning rate when validation loss plateaus
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', patience=10, factor=0.5, verbose=True
    )
    
    # Early stopping to prevent overfitting
    best_val_loss = float('inf')
    patience_counter = 0
    early_stopping_patience = 20
    
    # Track training history
    train_losses = []
    val_losses = []
    learning_rates = []
    
    print("Starting training...")
    print(f"Training batches per epoch: {len(train_loader)}")
    print(f"Validation batches per epoch: {len(val_loader)}")
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()  # Set model to training mode
        train_loss = 0.0
        train_batches = 0
        
        for batch_idx, (sequences, targets) in enumerate(train_loader):
            # Move data to device
            sequences = sequences.to(device)
            targets = targets.to(device)
            
            # Zero the gradients
            optimizer.zero_grad()
            
            # Forward pass
            predictions = model(sequences)
            
            # Calculate loss
            loss = criterion(predictions.squeeze(), targets)
            
            # Backward pass
            loss.backward()
            
            # Gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            # Update parameters
            optimizer.step()
            
            train_loss += loss.item()
            train_batches += 1
            
            # Print progress every 10 batches
            if batch_idx % 10 == 0:
                print(f'Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.6f}')
        
        # Calculate average training loss
        avg_train_loss = train_loss / train_batches
        
        # Validation phase
        model.eval()  # Set model to evaluation mode
        val_loss = 0.0
        val_batches = 0
        
        with torch.no_grad():  # Disable gradient computation for efficiency
            for sequences, targets in val_loader:
                sequences = sequences.to(device)
                targets = targets.to(device)
                
                predictions = model(sequences)
                loss = criterion(predictions.squeeze(), targets)
                
                val_loss += loss.item()
                val_batches += 1
        
        avg_val_loss = val_loss / val_batches
        
        # Record history
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        learning_rates.append(optimizer.param_groups[0]['lr'])
        
        # Print epoch summary
        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'  Train Loss: {avg_train_loss:.6f}')
        print(f'  Val Loss: {avg_val_loss:.6f}')
        print(f'  Learning Rate: {optimizer.param_groups[0]["lr"]:.8f}')
        print('-' * 50)
        
        # Learning rate scheduling
        scheduler.step(avg_val_loss)
        
        # Early stopping check
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0
            # Save the best model
            torch.save(model.state_dict(), 'best_cnn_model.pth')
            print(f'New best model saved with validation loss: {best_val_loss:.6f}')
        else:
            patience_counter += 1
            
        if patience_counter >= early_stopping_patience:
            print(f'Early stopping triggered after {epoch+1} epochs')
            break
    
    # Load the best model
    model.load_state_dict(torch.load('best_cnn_model.pth'))
    
    return model, train_losses, val_losses, learning_rates

In [None]:
def setup_data_loaders(sequences, targets, batch_size=32, train_split=0.8):
    """
    Create training and validation data loaders.
    
    This function handles the train/validation split and creates PyTorch DataLoaders
    that will feed batches of data to our model during training.
    """
    
    # Create dataset
    dataset = ProteinDataset(sequences, targets, max_length=500)
    
    # Calculate split sizes
    total_size = len(dataset)
    train_size = int(train_split * total_size)
    val_size = total_size - train_size
    
    print(f"Total dataset size: {total_size}")
    print(f"Training set size: {train_size}")
    print(f"Validation set size: {val_size}")
    
    # Split dataset
    train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
    
    # Create data loaders
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True,  # Shuffle training data
        num_workers=2,  # Use multiple workers for faster data loading
        pin_memory=True if torch.cuda.is_available() else False
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False,  # Don't shuffle validation data
        num_workers=2,
        pin_memory=True if torch.cuda.is_available() else False
    )
    
    return train_loader, val_loader, dataset

In [None]:
def evaluate_model(model, data_loader, dataset):
    """
    Evaluate the trained model and calculate performance metrics.
    
    This function provides comprehensive evaluation including predictions
    on both normalized and original scales.
    """
    
    model.eval()
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for sequences, targets in data_loader:
            sequences = sequences.to(device)
            targets = targets.to(device)
            
            predictions = model(sequences)
            
            all_predictions.extend(predictions.squeeze().cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
    
    # Convert to numpy arrays
    all_predictions = np.array(all_predictions)
    all_targets = np.array(all_targets)
    
    # Calculate metrics on normalized scale
    mse_normalized = mean_squared_error(all_targets, all_predictions)
    r2_normalized = r2_score(all_targets, all_predictions)
    
    # Denormalize for interpretable metrics
    targets_original = [dataset.denormalize_target(t) for t in all_targets]
    predictions_original = [dataset.denormalize_target(p) for p in all_predictions]
    
    mse_original = mean_squared_error(targets_original, predictions_original)
    r2_original = r2_score(targets_original, predictions_original)
    
    print("Model Evaluation Results:")
    print(f"R² Score (original scale): {r2_original:.4f}")
    print(f"MSE (original scale): {mse_original:.4f}")
    print(f"RMSE (original scale): {np.sqrt(mse_original):.4f}")
    
    return {
        'predictions_original': predictions_original,
        'targets_original': targets_original,
        'r2_score': r2_original,
        'mse': mse_original,
        'rmse': np.sqrt(mse_original)
    }

def plot_training_history(train_losses, val_losses, learning_rates):
    """
    Visualize the training process to understand model behavior.
    
    These plots help you understand whether your model is learning properly,
    overfitting, or if you need to adjust hyperparameters.
    """
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Plot training and validation losses
    axes[0, 0].plot(train_losses, label='Training Loss', color='blue')
    axes[0, 0].plot(val_losses, label='Validation Loss', color='red')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Loss')
    axes[0, 0].set_title('Training and Validation Loss')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # Plot learning rate schedule
    axes[0, 1].plot(learning_rates, color='green')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('Learning Rate')
    axes[0, 1].set_title('Learning Rate Schedule')
    axes[0, 1].set_yscale('log')
    axes[0, 1].grid(True)
    
    # Plot loss difference (overfitting indicator)
    loss_diff = np.array(val_losses) - np.array(train_losses)
    axes[1, 0].plot(loss_diff, color='purple')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Validation Loss - Training Loss')
    axes[1, 0].set_title('Overfitting Indicator')
    axes[1, 0].axhline(y=0, color='black', linestyle='--', alpha=0.5)
    axes[1, 0].grid(True)
    
    # Plot validation loss with trend
    axes[1, 1].plot(val_losses, color='red', alpha=0.7)
    # Add trend line
    z = np.polyfit(range(len(val_losses)), val_losses, 1)
    p = np.poly1d(z)
    axes[1, 1].plot(range(len(val_losses)), p(range(len(val_losses))), 
                   color='black', linestyle='--', label='Trend')
    axes[1, 1].set_xlabel('Epoch')
    axes[1, 1].set_ylabel('Validation Loss')
    axes[1, 1].set_title('Validation Loss with Trend')
    axes[1, 1].legend()
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()