In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DNAHybridModel(nn.Module):
    """Optimized hybrid CNN for DNA editing efficiency prediction
    Args:
        seq_length: Length of DNA sequence (default: 74)
        num_features: Number of auxiliary features (default: 5)
        conv_groups: Number of groups for grouped convolutions (default: 4)
    """
    def __init__(self, seq_length=74, num_features=5, conv_groups=4):
        super().__init__()
        
        # Validate convolution parameters
        assert conv_groups in [1, 2, 4], "conv_groups must be 1, 2, or 4"
        
        # Sequence processing pathway
        self.seq_net = nn.Sequential(
            nn.Conv1d(5, 32, kernel_size=9, padding=4),  # Input channels: 5 (A,T,G,C,x)
            nn.ReLU(inplace=True),
            nn.MaxPool1d(4),
            nn.Conv1d(32, 64, kernel_size=5, groups=conv_groups),
            nn.AdaptiveMaxPool1d(16)
        )
        
        # Feature processing pathway
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        
        # Combined prediction head
        self.head = nn.Sequential(
            nn.Linear(64*16 + 32, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 1)
        )

    def forward(self, x_seq, x_feat):
        """Forward pass
        Args:
            x_seq: DNA sequence tensor [batch_size, seq_length, 5]
            x_feat: Feature tensor [batch_size, num_features]
        Returns:
            Prediction tensor [batch_size, 1]
        """
        # Process sequence
        x_seq = x_seq.permute(0, 2, 1)  # [batch, channels, seq_length]
        seq_out = self.seq_net(x_seq)
        seq_out = seq_out.flatten(start_dim=1)  # [batch, 64*16]
        
        # Process features
        feat_out = self.feat_net(x_feat)  # [batch, 32]
        
        # Combine pathways
        combined = torch.cat([seq_out, feat_out], dim=1)
        return self.head(combined)

# Example usage
if __name__ == "__main__":
    # Test configuration
    batch_size = 32
    seq_length = 74
    num_features = 5
    
    # Initialize model
    model = DNAHybridModel()
    
    # Create dummy inputs
    dummy_seq = torch.randn(batch_size, seq_length, 5)
    dummy_feat = torch.randn(batch_size, num_features)
    
    # Test forward pass
    output = model(dummy_seq, dummy_feat)
    print(f"Input sequence shape: {dummy_seq.shape}")
    print(f"Input features shape: {dummy_feat.shape}")
    print(f"Output shape: {output.shape}")

Input sequence shape: torch.Size([32, 74, 5])
Input features shape: torch.Size([32, 5])
Output shape: torch.Size([32, 1])


In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import argparse

# --------------------------
# Model Definition
# --------------------------
class DNAHybridModel(nn.Module):
    def __init__(self, seq_length=74, num_features=5):
        super().__init__()
        # Sequence pathway
        self.seq_net = nn.Sequential(
            nn.Conv1d(5, 32, kernel_size=9, padding=4),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(4),
            nn.Conv1d(32, 64, kernel_size=5),
            nn.AdaptiveMaxPool1d(16)
        )
        
        # Feature pathway
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        
        # Combined head
        self.head = nn.Sequential(
            nn.Linear(64*16 + 32, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, 1)
        )

    def forward(self, x_seq, x_feat):
        x_seq = x_seq.permute(0, 2, 1)  # [batch, channels, seq_len]
        seq_out = self.seq_net(x_seq).flatten(1)
        feat_out = self.feat_net(x_feat)
        return self.head(torch.cat([seq_out, feat_out], dim=1))

# --------------------------
# Dataset Definition
# --------------------------
class DNADataset(Dataset):
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)
        self.nt_map = {'A':0, 'T':1, 'G':2, 'C':3, 'x':4}
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        # Sequence encoding
        seq = np.zeros((74, 5), dtype=np.float32)
        for i,c in enumerate(row['WT74_On'][:74]):
            seq[i, self.nt_map.get(c,4)] = 1
        # Features and target
        features = row[['PBSlen','RTlen','Edit_len','Tm1','nGCcnt1']].values.astype(np.float32)
        target = np.array(row['Measured_PE_efficiency'], dtype=np.float32)
        return seq, features, target

# --------------------------
# Training Code
# --------------------------
def train(args):
    # Initialize
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    dataset = DNADataset(args.data_path)
    loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)
    model = DNAHybridModel().to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=args.lr)
    criterion = nn.MSELoss()

    # Training loop
    for epoch in range(args.epochs):
        model.train()
        total_loss = 0
        
        for seqs, feats, labels in loader:
            seqs = seqs.to(device)
            feats = feats.to(device)
            labels = labels.to(device).unsqueeze(1)
            
            optimizer.zero_grad()
            outputs = model(seqs, feats)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{args.epochs} | Loss: {total_loss/len(loader):.4f}")
# Replace the __main__ section at the bottom with:

if __name__ == "__main__":
    # For notebook users: Set parameters directly
    class Args:
        data_path = "DT.csv"  # Your dataset filename
        batch_size = 256
        epochs = 10  # Start with fewer epochs for testing
        lr = 1e-3

    args = Args()
    
    # Verify dummy data
    dummy_model = DNAHybridModel()
    dummy_seq = torch.randn(2, 74, 5)
    dummy_feat = torch.randn(2, 5)
    print("Dummy test output shape:", dummy_model(dummy_seq, dummy_feat).shape)
    
    # Start training
    train(args)

Dummy test output shape: torch.Size([2, 1])
Epoch 1/10 | Loss: 12.4115
Epoch 2/10 | Loss: 11.4064
Epoch 3/10 | Loss: 11.0655
Epoch 4/10 | Loss: 10.7589
Epoch 5/10 | Loss: 10.4576
Epoch 6/10 | Loss: 10.1309
Epoch 7/10 | Loss: 9.8429
Epoch 8/10 | Loss: 9.5088
Epoch 9/10 | Loss: 9.2194
Epoch 10/10 | Loss: 8.9462


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# --------------------------
# Enhanced Model Definition
# --------------------------
class DNAHybridModel(nn.Module):
    def __init__(self, seq_length=74, num_features=5):
        super().__init__()
        
        # Enhanced Sequence Pathway
        self.seq_net = nn.Sequential(
            nn.Conv1d(5, 64, kernel_size=9, padding=4),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(4),
            
            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.AdaptiveMaxPool1d(16)
        )
        
        # Enhanced Feature Pathway
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU()
        )
        
        # Enhanced Combined Head
        self.head = nn.Sequential(
            nn.Linear(128*16 + 64, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, x_seq, x_feat):
        x_seq = x_seq.permute(0, 2, 1)
        seq_out = self.seq_net(x_seq).flatten(1)
        feat_out = self.feat_net(x_feat)
        return self.head(torch.cat([seq_out, feat_out], dim=1))

# --------------------------
# Enhanced Dataset Handling
# --------------------------
class DNADataset(Dataset):
    def __init__(self, features, targets, sequences, feature_scaler=None, target_scaler=None):
        self.features = features
        self.targets = targets
        self.sequences = sequences
        self.nt_map = {'A':0, 'T':1, 'G':2, 'C':3, 'x':4}

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        seq = np.zeros((74, 5), dtype=np.float32)
        for i,c in enumerate(self.sequences[idx][:74]):
            seq[i, self.nt_map.get(c,4)] = 1
        return seq, self.features[idx], self.targets[idx]

# --------------------------
# Optimized Training Code
# --------------------------
def train_model(args):
    # Load and prepare data
    df = pd.read_csv(args.data_path)
    
    # Train-Validation split
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
    
    # Feature scaling
    feature_cols = ['PBSlen','RTlen','Edit_len','Tm1','nGCcnt1']
    feature_scaler = StandardScaler().fit(train_df[feature_cols].values)
    
    # Target scaling
    target_scaler = StandardScaler().fit(train_df['Measured_PE_efficiency'].values.reshape(-1,1))
    
    # Prepare datasets
    train_dataset = DNADataset(
        features = feature_scaler.transform(train_df[feature_cols].values.astype(np.float32)),
        targets = target_scaler.transform(train_df['Measured_PE_efficiency'].values.reshape(-1,1)).astype(np.float32),
        sequences = train_df['WT74_On'].values
    )
    
    val_dataset = DNADataset(
        features = feature_scaler.transform(val_df[feature_cols].values.astype(np.float32)),
        targets = target_scaler.transform(val_df['Measured_PE_efficiency'].values.reshape(-1,1)).astype(np.float32),
        sequences = val_df['WT74_On'].values
    )

    # Create loaders
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=args.batch_size, num_workers=2)

    # Initialize training
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = DNAHybridModel().to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=args.lr, weight_decay=1e-4)
    criterion = nn.SmoothL1Loss()  # More robust than MSE
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)

    best_val_loss = float('inf')
    for epoch in range(args.epochs):
        # Training phase
        model.train()
        train_loss = 0
        for seqs, feats, labels in train_loader:
            seqs, feats, labels = seqs.to(device), feats.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(seqs, feats)
            loss = criterion(outputs, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            train_loss += loss.item()

        # Validation phase
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for seqs, feats, labels in val_loader:
                seqs, feats, labels = seqs.to(device), feats.to(device), labels.to(device)
                outputs = model(seqs, feats)
                val_loss += criterion(outputs, labels).item()

        # Update learning rate
        avg_val_loss = val_loss / len(val_loader)
        scheduler.step(avg_val_loss)

        print(f"Epoch {epoch+1}/{args.epochs}")
        print(f"  Train Loss: {train_loss/len(train_loader):.4f}")
        print(f"  Val Loss: {avg_val_loss:.4f}")
        print(f"  LR: {optimizer.param_groups[0]['lr']:.2e}")

        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')

    print(f"\nTraining complete. Best validation loss: {best_val_loss:.4f}")

# --------------------------
# Execution
# --------------------------
class Args:
    data_path = "DT.csv"
    batch_size = 128
    epochs = 50
    lr = 3e-4

if __name__ == "__main__":
    args = Args()
    
    # Verify model dimensions
    dummy_model = DNAHybridModel()
    dummy_seq = torch.randn(2, 74, 5)
    dummy_feat = torch.randn(2, 5)
    print("Dummy output shape:", dummy_model(dummy_seq, dummy_feat).shape)
    
    # Start training
    train_model(args)

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt

##################################
# Data Loading and Preparation
##################################
class DNADataset(Dataset):
    def __init__(self, csv_path):
        self.df = pd.read_csv(csv_path)
        self.nt_map = {'A': 0, 'T': 1, 'G': 2, 'C': 3, 'x': 4}
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        # One-hot encode sequence: shape (74, 5)
        seq = np.zeros((74, 5), dtype=np.float32)
        for i, c in enumerate(row['WT74_On'][:74]):
            seq[i, self.nt_map.get(c, 4)] = 1
        # Extra features (using 5 features as an example)
        features = row[['PBSlen', 'RTlen', 'Edit_len', 'Tm1', 'nGCcnt1']].values.astype(np.float32)
        target = np.array(row['Measured_PE_efficiency'], dtype=np.float32)
        return seq, features, target

##################################
# Model 1: LSTM-Based Hybrid Model
##################################
class LSTMHybridModel(nn.Module):
    def __init__(self, hidden_dim=64, lstm_layers=2, num_features=5):
        super().__init__()
        # LSTM for sequence processing (bidirectional)
        self.lstm = nn.LSTM(input_size=5, hidden_size=hidden_dim, 
                            num_layers=lstm_layers, batch_first=True, 
                            bidirectional=True)
        # Feature processing network
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        # Combined head: concatenates LSTM output and feature network output
        self.head = nn.Sequential(
            nn.Linear(hidden_dim * 2 + 32, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 1)
        )
        
    def forward(self, x_seq, x_feat):
        lstm_out, _ = self.lstm(x_seq)  # x_seq: (batch, seq_length, 5)
        lstm_out = lstm_out[:, -1, :]    # Use last time-step's output
        feat_out = self.feat_net(x_feat)
        combined = torch.cat([lstm_out, feat_out], dim=1)
        return self.head(combined)

##################################
# Model 2: CNN-Based Hybrid Model
##################################
class CNNHybridModel(nn.Module):
    def __init__(self, num_features=5):
        super().__init__()
        # CNN for sequence processing
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=5, out_channels=32, kernel_size=9, padding=4),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(2),
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.AdaptiveMaxPool1d(16)
        )
        # Feature processing network
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        # Combined head
        self.head = nn.Sequential(
            nn.Linear(64 * 16 + 32, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 1)
        )
        
    def forward(self, x_seq, x_feat):
        # x_seq shape: (batch, 74, 5); need to permute for CNN: (batch, channels, length)
        x_seq = x_seq.permute(0, 2, 1)
        cnn_out = self.cnn(x_seq)  # output shape: (batch, 64, 16)
        cnn_out = cnn_out.view(cnn_out.size(0), -1)  # Flatten
        feat_out = self.feat_net(x_feat)
        combined = torch.cat([cnn_out, feat_out], dim=1)
        return self.head(combined)

##################################
# Model 3: Transformer-Based Hybrid Model
##################################
class TransformerHybridModel(nn.Module):
    def __init__(self, num_features=5, embed_dim=32, num_heads=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.embed = nn.Linear(5, embed_dim)  # Project one-hot (5-dim) to embedding
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dropout=dropout)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # Feature processing network
        self.feat_net = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )
        # Combined head: after transformer, we do mean pooling over sequence positions
        self.head = nn.Sequential(
            nn.Linear(embed_dim + 32, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1)
        )
        
    def forward(self, x_seq, x_feat):
        # x_seq shape: (batch, seq_length, 5)
        x = self.embed(x_seq)  # (batch, seq_length, embed_dim)
        # Transformer expects shape (seq_length, batch, embed_dim)
        x = x.transpose(0, 1)
        transformer_out = self.transformer(x)  # (seq_length, batch, embed_dim)
        transformer_out = transformer_out.mean(dim=0)  # Mean pooling: (batch, embed_dim)
        feat_out = self.feat_net(x_feat)
        combined = torch.cat([transformer_out, feat_out], dim=1)
        return self.head(combined)

##################################
# Training and Evaluation Utilities
##################################
def train_model(model, train_loader, val_loader, epochs=30, lr=1e-3, weight_decay=1e-5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)
    criterion = nn.MSELoss()
    
    train_losses, val_losses = [], []
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for seqs, feats, labels in train_loader:
            seqs, feats, labels = seqs.to(device), feats.to(device), labels.to(device).unsqueeze(1)
            optimizer.zero_grad()
            outputs = model(seqs, feats)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_train_loss = total_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for seqs, feats, labels in val_loader:
                seqs, feats, labels = seqs.to(device), feats.to(device), labels.to(device).unsqueeze(1)
                outputs = model(seqs, feats)
                loss = criterion(outputs, labels)
                total_val_loss += loss.item()
        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        
        scheduler.step()
        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")
    
    # Optionally, plot the loss curves
    plt.figure(figsize=(8,6))
    plt.plot(range(1, epochs+1), train_losses, label="Train Loss")
    plt.plot(range(1, epochs+1), val_losses, label="Validation Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss")
    plt.legend()
    plt.show()

def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    actuals, predictions = [], []
    with torch.no_grad():
        for seqs, feats, labels in test_loader:
            seqs, feats = seqs.to(device), feats.to(device)
            preds = model(seqs, feats).cpu().numpy().flatten()
            predictions.extend(preds)
            actuals.extend(labels.numpy())
    mse = mean_squared_error(actuals, predictions)
    r2 = r2_score(actuals, predictions)
    print(f"Test MSE: {mse:.4f}, R²: {r2:.4f}")
    return mse, r2

##################################
# Main: Train and Compare Models
##################################
if __name__ == "__main__":
    # Load dataset
    dataset = DNADataset("DT.csv")
    # Split into train (80%), val (10%), test (10%)
    total = len(dataset)
    train_size = int(0.8 * total)
    val_size = int(0.1 * total)
    test_size = total - train_size - val_size
    train_set, val_set, test_set = random_split(dataset, [train_size, val_size, test_size])
    
    train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
    val_loader = DataLoader(val_set, batch_size=128)
    test_loader = DataLoader(test_set, batch_size=128)
    
    # Define models to compare
    models = {
        "LSTMHybrid": LSTMHybridModel(),
        "CNNHybrid": CNNHybridModel(),
        "TransformerHybrid": TransformerHybridModel()
    }
    
    results = {}
    for name, model in models.items():
        print(f"\nTraining {name} model...")
        train_model(model, train_loader, val_loader, epochs=30, lr=1e-3, weight_decay=1e-5)
        print(f"Evaluating {name} model on test set...")
        mse, r2 = evaluate_model(model, test_loader)
        results[name] = {"MSE": mse, "R2": r2}
    
    print("\nSummary of Results:")
    for model_name, metrics in results.items():
        print(f"{model_name}: MSE = {metrics['MSE']:.4f}, R² = {metrics['R2']:.4f}")





Training LSTMHybrid model...
Epoch 1/30 | Train Loss: 13.3949 | Val Loss: 12.9395
Epoch 2/30 | Train Loss: 13.0607 | Val Loss: 12.8094
Epoch 3/30 | Train Loss: 12.9645 | Val Loss: 12.8335
Epoch 4/30 | Train Loss: 12.9370 | Val Loss: 12.7679
Epoch 5/30 | Train Loss: 12.9146 | Val Loss: 12.8531
Epoch 6/30 | Train Loss: 12.8407 | Val Loss: 12.7159


KeyboardInterrupt: 