# Colab instructions
<details>
  <summary>This notebook - APA Isoform Prediction example</summary>
We note to the user that Lyra runs significantly faster on a Cuda-enabled GPU

## Using this notebook for your own sequence-to-value prediction tasks:

### 1. Data Format
Your input CSV files should contain at minimum:
- A 'seq' column with the sequences
- A target column with your prediction values

Modify the data loading section to match your column names:
```python
train_labels = train_df['your_target_column'].values
```

### 2. Choose the appropriate dataset type
Select based on your sequence type:
- For RNA sequences: use `RNADataset` and `one_hot_encode_sequences_RNA`
- For DNA sequences: use `DNADataset` and `one_hot_encode_dna`
- For protein sequences: use `ProteinDataset` and `one_hot_encode_protein`

### 3. Adjust model parameters
Set parameters based on your sequence type:
- `d_input`: Match your one-hot encoding dimension
  * RNA/DNA: `d_input = 4`
  * Standard proteins: `d_input = 20`
  * For custom alphabets: `d_input = number of possible characters`
- `d_output`: Set to number of target values (1 for single value regression)

```python
model = Lyra(d_input=your_dimension, d_output=1, d_model=64)
```

### 4. For protein sequences with non-natural amino acids
To modify the encoding:
1. Update the mapping array in `one_hot_encode_protein()`
2. Add new rows to the mapping array for each additional amino acid
3. Update `char_to_int` dictionary with new characters
4. Update `d_input` in the Lyra model initialization accordingly

Example for custom protein alphabet:
```python
mapping = np.array([
    [1, 0, 0, ...],  # Standard AAs as before
    [0, 0, 0, ...],  # New AA 'B'
    [0, 0, 0, ...],  # New AA 'O'
    # ... add more as needed
])
char_to_int = {c: i for i, c in enumerate('ILVFMCAGPTSYWQNHEDKRXJBO...')}
model = Lyra(d_input=len(char_to_int), d_output=1, d_model=64)
```
</details>

<details>
  <summary>General-purpose, no-code Colab notebook</summary>
  A more streamlined notebook can be found here, which has been designed for use by non-ML biologists, without having to change code for basic regression and classification tasks. https://colab.research.google.com/drive/1cbS9kBmnVYud5LQyEnbvVeQfpcvYT8nF?authuser=1#scrollTo=qLI9xMdi_DQy
</details>

<details>
  <summary>Github repo</summary>
  The full github repository can be found here: https://github.com/sameedmsiddiqui/lyra

</details>


In [None]:
!pip install torch numpy matplotlib seaborn einops scipy pandas tqdm scikit-learn

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt
import seaborn
import math
from einops import rearrange, repeat
from scipy.stats import spearmanr
torch.manual_seed(42)
import pandas as pd
from tqdm.auto import tqdm
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")

In [None]:
!nvidia-smi

In [3]:
def one_hot_encode_sequences_RNA(sequences):
    """
    Vectorized one-hot encoding of sequences with padding.

    Args:
    - sequences (list of str): List of sequences (e.g., sgrna or target).

    Returns:
    - Tensor: The one-hot encoded sequences as a batch.
    """
    # Define the one-hot encoding for RNA/DNA bases
    mapping = np.array([
        [1, 0, 0, 0],  # A
        [0, 1, 0, 0],  # T
        [0, 0, 1, 0],  # C
        [0, 0, 0, 1],  # G
        [0.0, 0.0, 0.0, 0.0],  # N (unknown base or padding)
        [0.0, 0.0, 0.0, 0.0]  # X (unknown base or padding)

    ])
    char_to_int = {c: i for i, c in enumerate('ATCGNX')}  # Map each base to an index

    # Find the maximum sequence length
    max_length = max(len(seq) for seq in sequences)

    # Pad sequences with 'N' to the max length
    padded_sequences = [seq.ljust(max_length, 'N') for seq in sequences]

    # Vectorized conversion of sequences to indices
    seq_indices = [[char_to_int[char] for char in seq] for seq in padded_sequences]
    encoded = np.array([mapping[seq] for seq in seq_indices])

    return torch.tensor(encoded, dtype=torch.float32)

class RNADataset(Dataset):
    def __init__(self, encoded_sequences, labels):
        """
        Initialize the dataset with preprocessed RNA data.

        Args:
        - encoded_sequences (Tensor): Encoded RNA sequences.
        - labels (Tensor): Corresponding labels.
        """
        self.encoded_sequences = encoded_sequences
        self.labels = labels

    def __len__(self):
        return len(self.encoded_sequences)

    def __getitem__(self, index):
        return self.encoded_sequences[index], torch.tensor(self.labels[index], dtype=torch.float32)


# One hot encoding for DNA Datasets
class DNADataset(Dataset):
    def __init__(self, encoded_sequences, labels):
        """
        Initialize the dataset with preprocessed data.

        Args:
        - encoded_sequences (Tensor): Encoded sequences.
        - labels (Tensor): Corresponding labels.
        """
        self.encoded_sequences = encoded_sequences
        self.labels = labels

    def __len__(self):
        return len(self.encoded_sequences)

    def __getitem__(self, index):
        return self.encoded_sequences[index], self.labels[index]


def one_hot_encode_dna(sequences):
    """
    Vectorized one-hot encoding of DNA sequences.

    Args:
    - sequences (list of str): List of DNA sequences.

    Returns:
    - Tensor: The one-hot encoded DNA sequences as a batch.
    """
    # Define the mapping in a vectorized form
    mapping = np.array([
        [1, 0, 0, 0],  # A
        [0, 1, 0, 0],  # C
        [0, 0, 1, 0],  # G
        [0, 0, 0, 1],  # T
        [0.25, 0.25, 0.25, 0.25],  # N (unknown base)
    ])
    char_to_int = {c: i for i, c in enumerate('ACGTN')}

    # Vectorized conversion of sequences to indices
    seq_indices = [[char_to_int.get(char.upper(), 4) for char in seq] for seq in sequences]
    encoded = np.array([mapping[seq] for seq in seq_indices])

    return torch.tensor(encoded, dtype=torch.float32)



# One hot encoding for Protein Datasets

def one_hot_encode_protein(sequences):
    """
    Vectorized one-hot encoding of protein sequences.

    Args:
    - sequences (list of str): List of protein sequences.

    Returns:
    - Tensor: The one-hot encoded DNA sequences as a batch.
    """
    # Define the mapping in a vectorized form
    # NB: depending on the dataset (i.e. if there are non-natural amino acids
    # you may have to change the one-hot encoding mapping)
    mapping = np.array([
        [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # I
        [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # L
        [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # V
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # F
        [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # M
        [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # C
        [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # A
        [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # G
        [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # P
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # T
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # S
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],  # Y
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],  # W
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],  # Q
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],  # N
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],  # H
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],  # E
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0],  # D
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0],  # K
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],  # R
        [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05],  # X
        [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]   # J
    ])
    char_to_int = {c: i for i, c in enumerate('ILVFMCAGPTSYWQNHEDKRXJ')}

    # Vectorized conversion of sequences to indices
    seq_indices = [[char_to_int[char] for char in seq] for seq in sequences]
    encoded = np.array([mapping[seq] for seq in seq_indices])

    return torch.tensor(encoded, dtype=torch.float32)

class ProteinDataset(Dataset):
    def __init__(self, encoded_sequences, labels):
        """
        Initialize the dataset with preprocessed data.

        Args:
        - encoded_sequences (Tensor): Encoded sequences.
        - labels (Tensor): Corresponding labels.
        """
        self.encoded_sequences = encoded_sequences
        self.labels = labels

    def __len__(self):
        return len(self.encoded_sequences)

    def __getitem__(self, index):
        return self.encoded_sequences[index], self.labels[index]




In [4]:
# Load the train, test and cross validation data



train_df = pd.read_csv("../datasets/Isoform/train.csv")
test_df = pd.read_csv("../datasets/Isoform/test.csv")
val_df = pd.read_csv("../datasets/Isoform/val.csv")

# Get sequences and labels
train_seqs = one_hot_encode_sequences_RNA(train_df['seq'].values)
test_seqs = one_hot_encode_sequences_RNA(test_df['seq'].values)
val_seqs = one_hot_encode_sequences_RNA(val_df['seq'].values)

# Use 'label' as the regression target
train_labels = train_df['proximal_isoform_proportion'].values
test_labels = test_df['proximal_isoform_proportion'].values
val_labels = val_df['proximal_isoform_proportion'].values

# Create dataset objects
train_dataset = RNADataset(train_seqs, train_labels)
test_dataset = RNADataset(test_seqs, test_labels)
val_dataset = RNADataset(val_seqs, val_labels)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [5]:
class PGC(nn.Module):
    def __init__(self,d_model,expansion_factor = 1.0,dropout = 0.0):
        super().__init__()

        self.d_model = d_model
        self.expansion_factor = expansion_factor
        self.dropout = dropout
        expanded_dim = int(d_model * expansion_factor)

        self.conv = nn.Conv1d(expanded_dim,
                              expanded_dim,
                              kernel_size=3,
                              padding=1,
                              groups=expanded_dim)

        self.in_proj = nn.Linear(d_model, int(d_model * expansion_factor * 2))
        self.out_norm = nn.RMSNorm(int(d_model), eps=1e-8)
        self.in_norm = nn.RMSNorm(expanded_dim * 2, eps=1e-8)
        self.out_proj = nn.Linear(expanded_dim, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, u):
        # Input projection and normalization
        xv = self.in_norm(self.in_proj(u))

        # Split projected input into two parts: x and v
        x, v = xv.chunk(2, dim=-1)

        # Depthwise convolution on x
        x_conv = self.conv(x.transpose(-1, -2)).transpose(-1, -2)

        # Gating mechanism
        gate = v * x_conv

        # Output projection and normalization
        x_out = self.out_norm(self.out_proj(gate))

        return x_out


In [6]:
class DropoutNd(nn.Module):
    def __init__(self, p: float = 0.5, tie=True, transposed=True):
        """
        tie: tie dropout mask across sequence lengths (Dropout1d/2d/3d)
        """
        super().__init__()
        if p < 0 or p >= 1:
            raise ValueError("dropout probability has to be in [0, 1), " "but got {}".format(p))
        self.p = p
        self.tie = tie
        self.transposed = transposed
        self.binomial = torch.distributions.binomial.Binomial(probs=1-self.p)

    def forward(self, X):
        """X: (batch, dim, lengths...)."""
        if self.training:
            if not self.transposed: X = rearrange(X, 'b ... d -> b d ...')
            # binomial = torch.distributions.binomial.Binomial(probs=1-self.p) # This is incredibly slow because of CPU -> GPU copying
            mask_shape = X.shape[:2] + (1,)*(X.ndim-2) if self.tie else X.shape
            # mask = self.binomial.sample(mask_shape)
            mask = torch.rand(*mask_shape, device=X.device) < 1.-self.p
            X = X * mask * (1.0/(1-self.p))
            if not self.transposed: X = rearrange(X, 'b d ... -> b ... d')
            return X
        return X

class S4DKernel(nn.Module):
    """Generate convolution kernel from diagonal SSM parameters."""

    def __init__(self, d_model, N=64, dt_min=0.001, dt_max=0.1, lr=None):
        super().__init__()
        # Generate dt
        H = d_model
        log_dt = torch.rand(H) * (
            math.log(dt_max) - math.log(dt_min)
        ) + math.log(dt_min)

        C = torch.randn(H, N // 2, dtype=torch.cfloat)
        self.C = nn.Parameter(torch.view_as_real(C))
        self.register("log_dt", log_dt, lr)

        log_A_real = torch.log(0.5 * torch.ones(H, N//2))
        A_imag = math.pi * repeat(torch.arange(N//2), 'n -> h n', h=H)
        self.register("log_A_real", log_A_real, lr)
        self.register("A_imag", A_imag, lr)

    def forward(self, L):
        """
        returns: (..., c, L) where c is number of channels (default 1)
        """

        # Materialize parameters
        dt = torch.exp(self.log_dt) # (H)
        C = torch.view_as_complex(self.C) # (H N)
        A = -torch.exp(self.log_A_real) + 1j * self.A_imag # (H N)

        # Vandermonde multiplication
        dtA = A * dt.unsqueeze(-1)  # (H N)
        K = dtA.unsqueeze(-1) * torch.arange(L, device=A.device) # (H N L)
        C = C * (torch.exp(dtA)-1.) / A
        K = 2 * torch.einsum('hn, hnl -> hl', C, torch.exp(K)).real

        return K

    def register(self, name, tensor, lr=None):
        """Register a tensor with a configurable learning rate and 0 weight decay"""

        if lr == 0.0:
            self.register_buffer(name, tensor)
        else:
            self.register_parameter(name, nn.Parameter(tensor))

            optim = {"weight_decay": 0.0}
            if lr is not None: optim["lr"] = lr
            setattr(getattr(self, name), "_optim", optim)


class S4D(nn.Module):
    def __init__(self, d_model, d_state=64, dropout=0.0, transposed=True, **kernel_args):
        super().__init__()

        self.h = d_model
        self.n = d_state
        self.d_output = self.h
        self.transposed = transposed

        self.D = nn.Parameter(torch.randn(self.h))
        # SSM Kernel
        self.kernel = S4DKernel(self.h, N=self.n, **kernel_args)
        # Pointwise
        self.activation = nn.GELU()
        dropout_fn = DropoutNd
        self.dropout = dropout_fn(dropout) if dropout > 0.0 else nn.Identity()

        # position-wise output transform to mix features
        self.output_linear = nn.Sequential(
            nn.Conv1d(self.h, 2*self.h, kernel_size=1),
            nn.GLU(dim=-2),
        )

    def forward(self, u, **kwargs): # absorbs return_output and transformer src mask
        """ Input and output shape (B, H, L) """
        if not self.transposed: u = u.transpose(-1, -2)
        L = u.size(-1)
        # Compute SSM Kernel
        k = self.kernel(L=L) # (H L)

        # Convolution
        k_f = torch.fft.rfft(k, n=2*L)  # (H L)
        u_f = torch.fft.rfft(u, n=2*L) # (B H L)
        y = torch.fft.irfft(u_f*k_f, n=2*L)[..., :L] # (B H L)

        # Compute D term in state space equation - essentially a skip connection
        y = y + u * self.D.unsqueeze(-1)

        y = self.dropout(self.activation(y))
        y = self.output_linear(y)
        if not self.transposed: y = y.transpose(-1, -2)
        return y

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Lyra(nn.Module):
    def __init__(self, d_input, d_output,d_model, d_state=64,
                 dropout=0.2, transposed=False, **kernel_args):
        super().__init__()

        # Projects input features into the model dimension
        self.encoder = nn.Linear(d_input, d_model)

        # First Projected Gated Convolution (PGC) layer - typically used to reduce dimensionality
        self.pgc1 = PGC(d_model, expansion_factor=0.25, dropout=dropout)

        # Second PGC layer - expands dimensionality (inverse of above)
        self.pgc2 = PGC(d_model, expansion_factor=2, dropout=dropout)

        # Core state space layer (S4D: Structured State Space for Sequences)
        # Handles long-range dependencies efficiently
        self.s4d = S4D(d_model, d_state=d_state, dropout=dropout, transposed=transposed, **kernel_args)

        # RMS normalization helps with training stability
        self.norm = nn.RMSNorm(d_model)

        # Final linear layer to project back to the output space
        self.decoder = nn.Linear(d_model, d_output)

        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)

    def forward(self, u):
        # Encode input into model dimension
        x = self.encoder(u)

        # Pass through first PGC (dimensionality reduction)
        x = self.pgc1(x)

        # Pass through second PGC (dimensionality expansion)
        x = self.pgc2(x)

        # Save intermediate representation
        z = x

        # Normalize before applying SSM (structured sequence modeling)
        z = self.norm(z)

        # Apply S4D and add residual connection
        x = self.dropout(self.s4d(z)) + x

        # Global average pooling across sequence dimension
        x = x.mean(dim=1)

        # (Optional) further dropout layer — commented out for now
        #x = self.dropout(x)

        # Decode final output
        x = self.decoder(x)
        return x

In [None]:
model = Lyra(d_input = 4, d_output = 1, d_model = 64).to(device)
print(model)
print(sum(p.numel() for p in model.parameters()))

In [None]:
# Initialize lists to store performance metrics for different data volumes
lyra_metrics = []
transformer_metrics = []

num_epochs = 25  # Adjust as needed

from scipy.stats import spearmanr
from sklearn.metrics import r2_score

def calculate_r2(y_true, y_pred):
    # Calculate R^2 score
    return r2_score(y_true, y_pred)

criterion = nn.MSELoss()

# Train models with different data volumes
# Train lyra model
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
best_lyra_loss = float('inf')
best_lyra_r2 = float('-inf')  # Track the best R^2 score
best_model_state = None

for epoch in range(num_epochs):
    model.train()
    lyra_train_loss = 0
    lyra_train_true = []
    lyra_train_pred = []

    for inputs, labels in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Lyra Training'):
        inputs, labels = inputs.to(device), labels.to(device)

        lyra_outputs = model(inputs)
        lyra_loss = criterion(lyra_outputs.squeeze(), labels)
        optimizer.zero_grad()
        lyra_loss.backward()
        optimizer.step()

        lyra_train_loss += lyra_loss.item()
        lyra_train_true.append(labels.detach().cpu().numpy())
        lyra_train_pred.append(lyra_outputs.squeeze().detach().cpu().numpy())

    # Evaluate on validation set
    model.eval()
    lyra_loss = 0
    lyra_true = []
    lyra_pred = []

    with torch.no_grad():
        for inputs, labels in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} - Lyra Validation'):
            inputs, labels = inputs.to(device), labels.to(device)

            lyra_outputs = model(inputs)
            lyra_loss = criterion(lyra_outputs.squeeze(), labels)
            lyra_loss += lyra_loss.item()
            lyra_true.append(labels.detach().cpu().numpy())
            lyra_pred.append(lyra_outputs.squeeze().detach().cpu().numpy())

        lyra_true = np.concatenate(lyra_true)
        lyra_pred = np.concatenate(lyra_pred)

    # Calculate and print validation statistics for this epoch
    epoch_r2 = calculate_r2(lyra_true, lyra_pred)
    print(f"\nEpoch {epoch+1} Validation Statistics:")
    print(f"Validation Loss: {lyra_loss/len(val_loader):.6f}")
    print(f"R^2 Score: {epoch_r2:.4f}")

    # Save the best model based on Validation performance
    if epoch_r2 > best_lyra_r2:
        best_lyra_r2 = epoch_r2
        best_model_state = model.state_dict()

    if lyra_loss < best_lyra_loss:
        best_lyra_loss = lyra_loss

# Load the best model and evaluate on test set
model.load_state_dict(best_model_state)
model.eval()
lyra_test_loss = 0
lyra_test_true = []
lyra_test_pred = []

with torch.no_grad():
    for inputs, labels in tqdm(test_loader, desc='Final Test Evaluation'):
        inputs, labels = inputs.to(device), labels.to(device)
        lyra_outputs = model(inputs)
        lyra_loss = criterion(lyra_outputs.squeeze(), labels)
        lyra_test_loss += lyra_loss.item()
        lyra_test_true.append(labels.detach().cpu().numpy())
        lyra_test_pred.append(lyra_outputs.squeeze().detach().cpu().numpy())

    lyra_test_true = np.concatenate(lyra_test_true)
    lyra_test_pred = np.concatenate(lyra_test_pred)

# Calculate final test metrics
final_test_r2 = calculate_r2(lyra_test_true, lyra_test_pred)
final_test_loss = lyra_test_loss / len(test_loader)

# Save the results
lyra_metrics.append({
    '_val_loss': best_lyra_loss,
    'best_val_r2': best_lyra_r2,
    'test_loss': final_test_loss,
    'test_r2': final_test_r2
})
display(lyra_metrics)