# PART 1: Data Processing

1. we are using the DNA sequence of chromosome 1 for this project

2. find the dataset here : https://www.ncbi.nlm.nih.gov/nuccore/NC_000001.11?report=fasta


3. The dataset contains characters ('N',
'A', 'C', 'G', 'T') :- hence we require data preprocessing

4. the preprocessing code contains char to index mapping to enable character embedding

5. we are creating a dataset from a fasta document



In [None]:
!pip install fasta_reader
!pip install gradio==3.50

In [2]:
from fasta_reader import read_fasta
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
from torch.utils.data import random_split
from torch.utils.data import DataLoader



In [3]:
#Helper Functions


def count_cg_pairs(dna_sequence: str) -> int:

  """
  Count the occurrences of the "CG" dinucleotide within a given DNA sequence.

  Parameters:
  - dna_sequence (str): A string representing the DNA sequence to be analyzed.

  Returns:
  - int: The total count of "CG" dinucleotides found in the DNA sequence.
  """
  cg_count = 0

  # Iterate over the sequence to find 'CG'
  for i in range(len(dna_sequence) - 1):  # Subtract 1 to avoid index out of range
      if dna_sequence[i:i+2] == "CG":  # Check if the current and next character form a 'CG' pair
          cg_count += 1

  return cg_count


#-
def dna_sequence_into_chunks(dna_sequence: str, chunk_size=800, sequence_size=128)-> list:
  """
  Divide a DNA sequence into smaller chunks of a specified size.

  Parameters:
  - dna_sequence (str): The DNA sequence to be divided into chunks.
  - chunk_size (int): The total number of chunks to produce. This parameter primarily serves to control the loop and is not directly related to the final number of chunks if the sequence length does not perfectly divide by 'sequence_size'.
  - sequence_size (int): The desired size of each chunk. Default is 128 characters.

  Returns:
  - list: A list of string chunks, each of the specified 'sequence_size'.
  """

  chunks = []
  for i in range(0,chunk_size*sequence_size, sequence_size):
      chunks.append(dna_sequence[i:i + sequence_size])
  return chunks


def dna_sequence_to_indices(sequence:str, char_to_index):

  """
  Converts a DNA sequence into a list of numerical indices based on a predefined mapping (char_to_index).

  """

  return [char_to_index[char] for char in sequence]

def dna_sequence_to_indices_chunk(dna_sequence:str, chunk_size:int,sequence_size:int,char_to_index)->list :

    """ convert sequence to mapped indices"""
    chunks = dna_sequence_into_chunks(dna_sequence, chunk_size,sequence_size)

    chunk_indices=[]
    for i in range(len(chunks)):
      row=dna_sequence_to_indices(chunks[i],char_to_index)
      chunk_indices.append(list(row))
    return chunk_indices

def dna_sequence_to_indeces_dataset(sequence: str, chunk_size=800, sequence_size=128)-> list:
    """
    Convert a DNA sequence into a dataset with each chunk labeled by its 'CG' count.

    Parameters:
    - sequence (str): The full DNA sequence to be processed.
    - chunk_size (int): Controls the number of chunks and data points generated.
    - sequence_size (int): Determines the uniform size of each chunk, in characters.

    Returns:
    - list: Dataset of tuples (chunk as indices, 'CG' count), ready for computational analysis and modeling.

   """

    chunks = dna_sequence_into_chunks(sequence, chunk_size,sequence_size)
    chunks_ind= dna_sequence_to_indices_chunk (cleaned_sequence,chunk_size,sequence_size,char_to_index)
    dataset=[]
    for i in range(chunk_size):
      dataset+=[(chunks_ind[i],count_cg_pairs(chunks[i]))]
    return dataset


In [4]:
# importing the file and cleaning the DNA sequence

FILE ="/content/drive/MyDrive/sequence.fasta"
for item in read_fasta(FILE):
  continue
sequence=item.sequence

"""
The dataset should only contain characters ('N',
'A', 'C', 'G', 'T','R') :- hence we require data preprocessing
"""

cleaned_sequence=''.join(filter(lambda char: char in ('A', 'C', 'G', 'T'), sequence))



In [5]:
#we use character indexing so that we can use character embedding
vocab = ['N', 'A', 'C', 'G', 'T']  # Include 'N' for unknown nucleotides
vocab_size = len(vocab)
char_to_index = {char: idx for idx, char in enumerate(vocab)}




In [6]:
#Creating dataset
dataset_i=dna_sequence_to_indeces_dataset(cleaned_sequence)



# Padding the dna sequence and converting to tensor


In [7]:
def dna_sequence_to_tensor(sequence, char_to_index=char_to_index, device='cuda'):
  """
  Convert a DNA sequence to a PyTorch tensor based on a character-to-index mapping.

  """
  indices = [char_to_index[char] for char in sequence]

  sequence_tensor = torch.tensor(indices, dtype=torch.long, device=device)

  sequence_tensor = sequence_tensor.unsqueeze(0)

  return sequence_tensor


def pad_and_retrieve_counts(dataset):
  """
  Pad the sequences represented as lists of indices to uniform length and retrieve CG counts.

  Args:
  - dataset (list of tuples): Each tuple contains a sequence as a list of indices and its "CG" count.

  Returns:
  - Tensor: Padded sequences.
  - Tensor: Corresponding "CG" counts.
  """
  # Unpack sequences and counts from the dataset
  dna_sequence_to_tensor = [torch.tensor(seq) for seq, _ in dataset]
  cg_counts = torch.tensor([count for _, count in dataset])

  # Pad the sequences
  padded_sequences = pad_sequence(dna_sequence_to_tensor, batch_first=True)

  return padded_sequences, cg_counts




In [8]:
# Padding the sequence

padded_sequences,cg_counts=pad_and_retrieve_counts(dataset_i)

# Creating Data loader
  - set train size, validation size, test size
  - set batch size

In [9]:
#TRAINING SET , VALIDATION SET , TEST SET
total_samples = 800 # Should be 500 based on your description
train_size = 400
val_size = 200
test_size = 200  # Total = 500

dataset = TensorDataset(padded_sequences, cg_counts.float())

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

batch_size_train = 1  # Example batch size for training; adjust as needed
batch_size_val_test = 1  # Batch size for validation and testing

train_loader = DataLoader(train_dataset, batch_size=batch_size_train, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size_val_test, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size_val_test, shuffle=False)


# Model Config
  - set model config (contains embedding layer)
  - set loss Function
  - set optimizer
  - set embedding dimension
  - set hidden output

In [28]:
class DNALSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim=1):
        super(DNALSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, batch_first=True)
        self.fc = nn.Linear(in_features=hidden_dim, out_features=output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        # You might want to take the output from the last timestep or aggregate outputs

        final_feature_map = lstm_out[:, -1, :]
        output = self.fc(final_feature_map)
        return output


In [11]:
# model Config
vocab_size = len(vocab)
embedding_dim = 248
hidden_dim = 128
output_dim = 1  # Assuming a regression task; adjust as needed for classification


In [12]:
# instantiating the LSTM model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = DNALSTM(vocab_size, embedding_dim, hidden_dim, output_dim)
model = model.to(device)
loss_function = nn.MSELoss()  # For regression
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)

# Model Training
  - set number of epoch here

In [13]:
def train_model__(model, data_loader, loss_function, optimizer, device, num_epochs=100):
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        total_loss = 0.0
        num_batches = 0

        for sequences, labels in data_loader:
            sequences = sequences.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(sequences).squeeze()  # Adjust based on your model's output
            loss = loss_function(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()  # Accumulate the loss
            num_batches += 1

        # Calculate the average loss over all batches
        avg_loss = total_loss / num_batches
        print(f'Epoch {epoch + 1}/{num_epochs}, Average Loss: {avg_loss:.4f}')


In [14]:
#Train
train_model__(model, train_loader, loss_function, optimizer,"cuda")


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1/100, Average Loss: 7.3681
Epoch 2/100, Average Loss: 5.9913
Epoch 3/100, Average Loss: 5.6659
Epoch 4/100, Average Loss: 5.2249
Epoch 5/100, Average Loss: 4.8205
Epoch 6/100, Average Loss: 4.1057
Epoch 7/100, Average Loss: 3.7829
Epoch 8/100, Average Loss: 3.5796
Epoch 9/100, Average Loss: 3.3339
Epoch 10/100, Average Loss: 3.5374
Epoch 11/100, Average Loss: 3.0597
Epoch 12/100, Average Loss: 3.1327
Epoch 13/100, Average Loss: 3.7372
Epoch 14/100, Average Loss: 3.3960
Epoch 15/100, Average Loss: 2.6874
Epoch 16/100, Average Loss: 2.5967
Epoch 17/100, Average Loss: 2.3733
Epoch 18/100, Average Loss: 1.9852
Epoch 19/100, Average Loss: 1.6415
Epoch 20/100, Average Loss: 1.7117
Epoch 21/100, Average Loss: 1.4361
Epoch 22/100, Average Loss: 1.0767
Epoch 23/100, Average Loss: 1.3426
Epoch 24/100, Average Loss: 1.0264
Epoch 25/100, Average Loss: 0.8859
Epoch 26/100, Average Loss: 0.9148
Epoch 27/100, Average Loss: 0.8143
Epoch 28/100, Average Loss: 0.7718
Epoch 29/100, Average Loss: 1

# Save or Load model
  - First snippet contains code to save model
  - Second snippet contains code to load model

## save model here

In [15]:
#Save
torch.save(model.state_dict(), "/content/drive/MyDrive/cpg_detector_lstm.pth")


## load model here

In [16]:
vocab_size = 5
embedding_dim = 248
hidden_dim = 128
output_dim = 1  # Assuming a regression task; adjust as needed for classification

model_new = DNALSTM(vocab_size, embedding_dim, hidden_dim, output_dim)
model_new.load_state_dict(torch.load("/content/drive/MyDrive/cpg_detector_lstm.pth"))
model_new.to("cuda")


DNALSTM(
  (embedding): Embedding(5, 248)
  (lstm): LSTM(248, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

# Model Evaluation
  - choose a model and data loader for evaluation
  - we use average loss here

In [17]:
def evaluate_model(model, data_loader, device):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    with torch.no_grad():  # No need to track gradients during evaluation
        for sequences, labels in data_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            predictions = model(sequences)
            loss = loss_function(predictions, labels.float())  # Assuming regression; use .float() for labels if needed
            total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)  # Calculate average loss
    print(f"Average Loss: {avg_loss:.4f}")

In [18]:
evaluate_model(model, test_loader, device)

Average Loss: 0.9743


  return F.mse_loss(input, target, reduction=self.reduction)


# PART 2: Loading trained model and using Gradio app
  - Contains helper function for Gradio

In [29]:
# importing a trained model

vocab_size = 5
embedding_dim = 248
hidden_dim = 128
output_dim = 1  # Assuming a regression task; adjust as needed for classification

model_new = DNALSTM(vocab_size, embedding_dim, hidden_dim, output_dim)
model_new.load_state_dict(torch.load("/content/drive/MyDrive/cpg_detector_lstm.pth"))
model_new.to("cuda")


DNALSTM(
  (embedding): Embedding(5, 248)
  (lstm): LSTM(248, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

In [30]:
import gradio as gr

#Helper fucntion for gradio interface
def predict_cg_count(sequence) -> int :
    """
    Predicts 'CG' count in a DNA sequence using a pre-trained model.

    Args:
    - sequence (str): Input DNA sequence.

    Returns:
    - int: Predicted 'CG' count.
    """
    sequence_tensor = dna_sequence_to_tensor(sequence.upper(), char_to_index, )
    with torch.no_grad():
        prediction = model_new(sequence_tensor)
        # Assuming your model outputs a raw count, adjust based on your model's output
        cg_count = prediction.item()
    return int(cg_count)




In [27]:
# Create the Gradio interface

interface = gr.Interface(
    fn=predict_cg_count,
    inputs=gr.Textbox(label="DNA Sequence Here", placeholder="Enter DNA Sequence...", lines=2),
    outputs=gr.Number(label="Predicted CG Count"),
    title="DNA CG Counter",
    description="This app predicts the count of 'CG' pairs in a DNA sequence. Enter a DNA sequence to get started."
)

# Launch the app
if __name__ == "__main__":
    interface.launch(debug=True)

Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
Running on public URL: https://8f2c3f34a0d718f90c.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://8f2c3f34a0d718f90c.gradio.live
