# Part 1: Build CpG Detector

Here we have a simple problem, given a DNA sequence (of N, A, C, G, T), count the number of CpGs in the sequence (consecutive CGs).

We have defined a few helper functions / parameters for performing this task.

We need you to build a LSTM model and train it to complish this task in PyTorch.

A good solution will be a model that can be trained, with high confidence in correctness.

In [1]:
from typing import Sequence
from functools import partial
import random
import torch
import numpy as np
import random

In [2]:
# DO NOT CHANGE HERE
def set_seed(seed=13):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed(13)
def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
    for _ in range(n_seqs):
        yield [random.randint(0, 4) for _ in range(seq_len)]
# Use this for getting x label
# def rand_sequence(n_seqs: int, seq_len: int=128) -> Sequence[int]:
#     for i in range(n_seqs):
#         yield [random.randint(0, 4) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs

# Alphabet helpers
alphabet = 'NACGT'
dna2int = { a: i for a, i in zip(alphabet, range(5))}
int2dna = { i: a for a, i in zip(alphabet, range(5))}

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [3]:
for a, i in zip(alphabet, range(5)):
  print(a,i)

N 0
A 1
C 2
G 3
T 4


In [8]:
# some config
LSTM_HIDDEN = 128
LSTM_LAYER = 1
batch_size = 32
learning_rate = 0.0001
epoch_num = 30

In [6]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

def prepare_data(num_samples=100):
    # Prepare the training and test data
    X_dna_seqs_train = list(rand_sequence(num_samples))
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]
    print(temp)
    y_dna_seqs = [count_cpgs(seq) for seq in temp]
    return X_dna_seqs_train, y_dna_seqs

In [7]:
# Generate training and test data
train_x, train_y = prepare_data(2048)
test_x, test_y = prepare_data(512)

# Convert the training data to PyTorch tensors
train_x_tensor = torch.tensor(train_x)
train_y_tensor = torch.tensor(train_y).float().unsqueeze(1)  # Ensure train_y is a column vector

# Convert the test data to PyTorch tensors
test_x_tensor = torch.tensor(test_x)
test_y_tensor = torch.tensor(test_y).float().unsqueeze(1)  # Ensure test_y is a column vector

# Create a TensorDataset
train_dataset = TensorDataset(train_x_tensor, train_y_tensor)
test_dataset = TensorDataset(test_x_tensor, test_y_tensor)

# Create a DataLoader for the training data
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Create a DataLoader for the test data
test_data_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

['CCAAAAAANTACNGATNCANCGGACCAGTTGCGCNCTCGTTAGGTACAATCCCGCTCGATGTATAACANGGGNGNAAGGCAGCCACTATAGATNTTNCTTCACAATNAANGTTTNGCNTNACNGCCTT', 'NGTCNNCCAGTTTGAACNGTAACGACGTACATCGACGNCGGNNNTNCAGGTGCNCTGAANNCAACCTGCNCANCGGTTGNCTCANCATANAANGCNCCANAATGATGTNTNNTATCTGCATCCNCNNA', 'NNTCNCGATCNNNCGGGGCAAGCNTNTCCCAACGAGGCAGTTGAGGANTNGTAANNTACNGTTNAGNTNACTANTGCAGNTNACTTNTGCATNTGATNNCAGCACACTNCAAATNNTATTNGCGTTGT', 'TGNGTCCGNACGTNGGCNGGGGTNNCANATCCTAGTCNANNANTGACANAANNAGTAGNGTANNCTNTTANGCGCACCTTGCCANNANNCAATCNNGGTNNTTGAGAGCCNNGNNCGNGANTAACAGG', 'NCCCCANAATAGNTGNGGTAATGNGGCGGCNTAGATTNGGNTTGATNTANAGNGCGNCAGGTAGAGTGNGGAATGANNNGTTGGTAGNGGTGNTTTNNGAANNGTAGGNGNACGTAAANATNCAGAGN', 'TANGNNTNGTAATNNGTAGCAGTAAATTTNTGTAGTGACCGNNGGAACNGATCTTTNTAGCAGNGCNANCGNNNNCNTATGTGTNGTNTCTCNGGNTACANTANGTCGCGAGANTTGCNNCGAGNNGT', 'NGTCANTTCNCGNATCNCGNGAGNCAACGACTGANNCANAACCTGTCNNACNTAANGGGCCTCNGCAGNGANNANCAANCGAATTCNNCGGGGGNCTGTACCGGTNTANCTNCCGTGTGANAGTNTAN', 'GGGTAGTNCGGNCACGCACCCCANCNAACCTCNGNATGGGCGNNCAANNNNGAAAGACNTGTNNGTTGNGCNTC

In [9]:
# Define the model
class CpGPredictor(nn.Module):
    def __init__(self, input_size=5, hidden_size=128, num_layers=1, output_size=1):
        super(CpGPredictor, self).__init__()
        # Define the LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # Define the classifier layer
        self.classifier = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden and cell states for each batch
        h0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)

        # Forward propagate LSTM

        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)

        # Decode the hidden state of the last time step
        out = self.classifier(out[:, -1, :])
        return out

# Instantiate the model
model = CpGPredictor(input_size=5, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, output_size=1)

In [10]:
model

CpGPredictor(
  (lstm): LSTM(5, 128, batch_first=True)
  (classifier): Linear(in_features=128, out_features=1, bias=True)
)

In [12]:
# Function to count the number of parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Get the number of parameters
num_params = count_parameters(model)
print(f'The model has {num_params} trainable parameters')

The model has 69249 trainable parameters


In [13]:
# Loss function and optimizer
loss_fn = nn.MSELoss()  # Use Mean Squared Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [16]:
# Training loop
num_epochs = 30
for epoch in range(num_epochs):
    t_loss = 0.0
    t_correct = 0
    t_total = 0
    for x_batch, y_batch in train_data_loader:
        # Forward pass
        # print(x_batch.shape)
        x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
        outputs = model(x_one_hot)
        loss = loss_fn(outputs, y_batch)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        t_loss += loss.item()
        # Calculate training accuracy
        _, predicted = torch.max(outputs.data, 1)
        t_total += y_batch.size(0)
        t_correct += (predicted == y_batch).sum().item()

    # Calculate average loss and accuracy for this epoch
    avg_loss = t_loss / len(train_data_loader)
    avg_acc = t_correct / t_total
    print(f'Training Accuracy : Epoch [{epoch+1}/{epoch_num}], Average Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f},train Loss: {loss.item()}')

    # Reset the total loss and accuracy for the next epoch
    t_loss = 0.0
    t_correct = 0
    t_total = 0

    # Print loss for each epoch
    # print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
    # Validation loop
    model.eval()  # Set the model to evaluation mode
    val_loss = 0.0
    vt_correct = 0
    vt_total = 0
    with torch.no_grad():
        for x_batch, y_batch in test_data_loader:
            x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
            outputs = model(x_one_hot)
            # outputs = model(x_batch)
            test_loss = loss_fn(outputs, y_batch)
            val_loss += test_loss.item()
            # Calculate validation accuracy
            _, predicted = torch.max(outputs.data, 1)
            vt_total += y_batch.size(0)
            vt_correct += (predicted == y_batch).sum().item()
    # Calculate average loss and accuracy for this epoch
    avg_loss = val_loss / len(train_data_loader)
    avg_acc = vt_correct / vt_total
    print(f'Validation Accuracy : Epoch [{epoch+1}/{epoch_num}], Average Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}, Test Loss: {test_loss.item()}')
    # print(f'Epoch [{epoch+1}/{epoch_num}],Test Loss: {test_loss.item()}')


Training Accuracy : Epoch [1/30], Average Loss: 4.1939, Accuracy: 0.1250,train Loss: 3.453348159790039
Validation Accuracy : Epoch [1/30], Average Loss: 1.0409, Accuracy: 0.1250, Test Loss: 4.338506698608398
Training Accuracy : Epoch [2/30], Average Loss: 4.1921, Accuracy: 0.1250,train Loss: 5.21331787109375
Validation Accuracy : Epoch [2/30], Average Loss: 1.0388, Accuracy: 0.1250, Test Loss: 4.26591157913208
Training Accuracy : Epoch [3/30], Average Loss: 4.1933, Accuracy: 0.1250,train Loss: 3.7794759273529053
Validation Accuracy : Epoch [3/30], Average Loss: 1.0394, Accuracy: 0.1250, Test Loss: 4.295933723449707
Training Accuracy : Epoch [4/30], Average Loss: 4.1910, Accuracy: 0.1250,train Loss: 3.676661729812622
Validation Accuracy : Epoch [4/30], Average Loss: 1.0388, Accuracy: 0.1250, Test Loss: 4.183655738830566
Training Accuracy : Epoch [5/30], Average Loss: 4.1970, Accuracy: 0.1250,train Loss: 4.931069374084473
Validation Accuracy : Epoch [5/30], Average Loss: 1.0384, Accuracy

In [18]:
# Evaluation loop
model.eval()  # Set the model to evaluation mode
res_gs = []
res_pred = []
if torch.cuda.is_available():
    device = torch.device('cuda')
    print('Using GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device('cpu')

with torch.no_grad():  # Deactivate autograd for evaluation to save memory and speed up computations
    for x_batch, y_batch in test_data_loader:
        x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
        x_one_hot = x_one_hot.to(device)
        y_batch = y_batch.to(device)
        outputs = model(x_one_hot)
        res_gs.extend(y_batch.tolist())
        res_pred.extend(outputs.squeeze().tolist())
        # outputs = model(x_batch)
    test_loss = loss_fn(outputs, y_batch)
    print("test_loss : ",test_loss)

# Convert the lists to numpy arrays for easier analysis
res_gs = np.array(res_gs)
res_pred = np.array(res_pred)

# Print the results or perform any other evaluation metrics you want


test_loss :  tensor(4.2656)


In [19]:
print("Ground Truth Labels:", res_gs)
print("Predicted Labels:", res_pred)

Ground Truth Labels: [[ 4.]
 [ 4.]
 [ 7.]
 [ 5.]
 [ 4.]
 [ 4.]
 [ 6.]
 [ 8.]
 [ 6.]
 [ 9.]
 [ 7.]
 [ 6.]
 [ 4.]
 [ 9.]
 [ 4.]
 [ 3.]
 [ 6.]
 [ 2.]
 [ 9.]
 [ 7.]
 [ 9.]
 [ 7.]
 [ 6.]
 [ 3.]
 [ 5.]
 [ 5.]
 [ 2.]
 [ 4.]
 [ 4.]
 [ 5.]
 [ 4.]
 [ 3.]
 [ 7.]
 [ 5.]
 [ 6.]
 [ 6.]
 [ 5.]
 [ 8.]
 [ 3.]
 [ 6.]
 [ 5.]
 [ 5.]
 [ 6.]
 [ 7.]
 [ 3.]
 [ 6.]
 [ 6.]
 [ 4.]
 [ 4.]
 [ 6.]
 [ 5.]
 [ 6.]
 [ 6.]
 [ 6.]
 [ 9.]
 [ 7.]
 [ 8.]
 [10.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 3.]
 [ 7.]
 [ 4.]
 [ 8.]
 [ 4.]
 [ 6.]
 [ 6.]
 [ 3.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 4.]
 [ 0.]
 [ 3.]
 [ 4.]
 [ 5.]
 [ 5.]
 [ 7.]
 [ 6.]
 [ 6.]
 [ 4.]
 [ 4.]
 [ 2.]
 [ 4.]
 [ 7.]
 [ 7.]
 [ 6.]
 [12.]
 [ 4.]
 [ 1.]
 [ 7.]
 [ 3.]
 [ 9.]
 [ 6.]
 [ 4.]
 [ 5.]
 [ 6.]
 [ 2.]
 [ 7.]
 [ 2.]
 [ 9.]
 [ 5.]
 [ 3.]
 [ 3.]
 [ 7.]
 [ 8.]
 [ 9.]
 [ 7.]
 [ 4.]
 [ 7.]
 [ 4.]
 [ 3.]
 [ 8.]
 [ 4.]
 [ 5.]
 [ 4.]
 [ 4.]
 [ 2.]
 [ 5.]
 [ 4.]
 [ 5.]
 [ 2.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 3.]
 [ 6.]
 [ 7.]
 [ 2.]
 [ 5.]
 [ 6.]
 [ 5.]
 [ 8.]
 [ 3.]
 [ 6.]
 [ 3.]
 [ 6.]
 [ 4.]
 [ 4.]

In [20]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Calculate evaluation metrics
mae = mean_absolute_error(res_gs, res_pred)
mse = mean_squared_error(res_gs, res_pred)
r2 = r2_score(res_gs, res_pred)

# Print the evaluation metrics
print(f'Mean Absolute Error (MAE): {mae:.4f}')
print(f'Mean Squared Error (MSE): {mse:.4f}')
print(f'R-squared (R2) Score: {r2:.4f}')

Mean Absolute Error (MAE): 1.5871
Mean Squared Error (MSE): 4.1466
R-squared (R2) Score: 0.0025


In [22]:
# Save the model
model_path = 'CP_count_model.pth'
torch.save(model.state_dict(), model_path)

In [25]:
#Prediction
import torch
import torch.nn.functional as F

# Define the mapping from nucleotides to integers
nucleotide_to_index = {'N': 0, 'A': 1, 'C': 2, 'G': 3, 'T': 4}

# Function to convert a string of nucleotides to a tensor
def string_to_tensor(s, mapping):
    # Convert the string to a list of integers
    indices = [mapping[c] for c in s]
    # Convert the list to a tensor
    tensor = torch.tensor(indices)
    # One-hot encode the tensor
    one_hot = F.one_hot(tensor, num_classes=len(mapping))
    # Add a batch dimension
    one_hot = one_hot.unsqueeze(0).float()
    return one_hot

# Test string
test_string = "NAACGTANCGCGC"

# Convert the test string to a tensor
test_tensor = string_to_tensor(test_string, nucleotide_to_index)

# Load the model (assuming it's the same as before)
model = CpGPredictor()
state_dict = torch.load("/content/CP_count_model.pth")

# Remove the unexpected keys from the state dictionary
unexpected_keys = ['lstm.weight_ih_l1', 'lstm.weight_hh_l1', 'lstm.bias_ih_l1', 'lstm.bias_hh_l1']
for key in unexpected_keys:
    if key in state_dict:
        print("key--------",key)
        del state_dict[key]

model.load_state_dict(state_dict, strict=False)
model.eval()

# Pass the tensor through the model
with torch.no_grad():
    prediction = model(test_tensor)

# Print the prediction
print(prediction)

tensor([[3.2908]])


# Part 2: what if the DNA sequences are not the same length

In [26]:
# hint we will need following imports
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

In [27]:
# DO NOT CHANGE HERE
random.seed(13)

# Use this for getting x label
def rand_sequence_var_len(n_seqs: int, lb: int=16, ub: int=128) -> Sequence[int]:
    for i in range(n_seqs):
        seq_len = random.randint(lb, ub)
        yield [random.randint(1, 5) for _ in range(seq_len)]

# Use this for getting y label
def count_cpgs(seq: str) -> int:
    cgs = 0
    for i in range(0, len(seq) - 1):
        dimer = seq[i:i+2]
        # note that seq is a string, not a list
        if dimer == "CG":
            cgs += 1
    return cgs


# Alphabet helpers
alphabet = 'NACGT'
dna2int = {a: i for a, i in zip(alphabet, range(1, 6))}
int2dna = {i: a for a, i in zip(alphabet, range(1, 6))}
dna2int.update({"pad": 0})
int2dna.update({0: "<pad>"})

intseq_to_dnaseq = partial(map, int2dna.get)
dnaseq_to_intseq = partial(map, dna2int.get)

In [28]:
def prepare_data(num_samples=100, min_len=16, max_len=128):
    # Generate the training data
    X_dna_seqs_train = list(rand_sequence_var_len(num_samples, min_len, max_len))

    # Convert the integer sequences to DNA sequences
    temp = ["".join(intseq_to_dnaseq(seq)) for seq in X_dna_seqs_train]

    # Count the CpGs in each DNA sequence
    y_dna_seqs = [count_cpgs(seq) for seq in temp]

    # Convert the DNA sequences to integer sequences
    X_int_seqs_train = [list(dnaseq_to_intseq(seq)) for seq in temp]

    # Pad the sequences to the same length
    X_padded = pad_sequence([torch.tensor(seq) for seq in X_int_seqs_train], batch_first=True)

    return X_padded, torch.tensor(y_dna_seqs, dtype=torch.float32)

min_len, max_len = 64, 128
train_x, train_y = prepare_data(2048, min_len, max_len)
test_x, test_y = prepare_data(512, min_len, max_len)

In [29]:
test_x

tensor([[5, 4, 1,  ..., 0, 0, 0],
        [5, 3, 1,  ..., 0, 0, 0],
        [4, 1, 2,  ..., 0, 0, 0],
        ...,
        [2, 2, 3,  ..., 0, 0, 0],
        [3, 2, 1,  ..., 0, 0, 0],
        [2, 4, 3,  ..., 0, 0, 0]])

In [30]:
test_y

tensor([ 5.,  2.,  1.,  4.,  3.,  5.,  5.,  2.,  2.,  3.,  4.,  7.,  4.,  5.,
         2., 10.,  2.,  5.,  7.,  2.,  3.,  3.,  5.,  5.,  5.,  3.,  2.,  2.,
         6.,  4.,  4.,  2.,  2.,  2.,  1.,  4.,  7.,  7.,  2.,  5.,  1.,  2.,
         7.,  2.,  3.,  3.,  3.,  4.,  2.,  3.,  4.,  5.,  3.,  5.,  5.,  7.,
         1.,  2.,  4.,  2.,  3.,  2.,  4.,  4.,  5.,  3.,  4.,  8.,  3.,  7.,
         7.,  5.,  3.,  5.,  3.,  2.,  4.,  4.,  6.,  6.,  4.,  2.,  2.,  2.,
         5., 10.,  4.,  5.,  5.,  7.,  4.,  6.,  4.,  2.,  3.,  5.,  3.,  2.,
         7.,  3.,  0.,  3.,  4.,  6.,  2.,  2.,  2.,  2.,  1.,  1.,  4.,  5.,
         7.,  3.,  1.,  8.,  5.,  4.,  6.,  3.,  4.,  4.,  5.,  7.,  6.,  3.,
         5.,  4.,  1.,  4.,  2.,  4.,  5.,  5.,  4.,  6.,  3.,  1.,  5.,  2.,
         3.,  5.,  6.,  6.,  5.,  6.,  7.,  4.,  3.,  0.,  4.,  1.,  5.,  1.,
         5.,  7.,  0.,  5.,  1.,  5.,  1.,  5., 11.,  6.,  3.,  2.,  2.,  3.,
         0.,  4.,  3.,  5.,  2.,  5.,  3.,  3.,  1.,  0.,  6.,  

In [31]:
class MyDataset(torch.utils.data.Dataset):
    def __init__(self, lists, labels) -> None:
        self.lists = lists
        self.labels = labels

    def __getitem__(self, index):
        return torch.LongTensor(self.lists[index]), self.labels[index]

    def __len__(self):
        return len(self.lists)


# this will be a collate_fn for dataloader to pad sequence
# class PadSequence:
    #TODO
from torch.nn.utils.rnn import pad_sequence

class PadSequence:
    def __call__(self, batch):
        # Separate sequences and labels from the batch
        sequences, labels = zip(*batch)

        # Convert sequences to tensors
        sequences = [torch.tensor(seq) for seq in sequences]

        # Pad the sequences
        padded_sequences = pad_sequence(sequences, batch_first=True)

        # Convert labels to a tensor
        labels = torch.stack(labels)

        return padded_sequences, labels
# Assuming train_x and train_y are the padded sequences and labels from the prepare_data function
train_dataset = MyDataset(train_x, train_y)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, collate_fn=PadSequence())

# You can do the same for the test dataset
test_dataset = MyDataset(test_x, test_y)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, collate_fn=PadSequence())

In [32]:
test_loader

<torch.utils.data.dataloader.DataLoader at 0x7f2b038665f0>

In [33]:
import torch.nn as nn

class CpGPredictor(nn.Module):
    def __init__(self, input_size=5, hidden_size=128, num_layers=1, output_size=1):
        super(CpGPredictor, self).__init__()
        # Define the LSTM layer
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # Define the classifier layer
        self.classifier = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden and cell states for each batch
        h0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(self.lstm.num_layers, x.size(0), self.lstm.hidden_size).to(x.device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)

        # Decode the hidden state of the last time step
        out = self.classifier(out[:, -1, :])
        return out

In [35]:
model = CpGPredictor(input_size=5, hidden_size=LSTM_HIDDEN, num_layers=LSTM_LAYER, output_size=1)

In [36]:
model

CpGPredictor(
  (lstm): LSTM(5, 128, batch_first=True)
  (classifier): Linear(in_features=128, out_features=1, bias=True)
)

In [37]:
# Function to count the number of parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Get the number of parameters
num_params = count_parameters(model)
print(f'The model has {num_params} trainable parameters')

The model has 69249 trainable parameters


In [38]:
loss_fn = nn.MSELoss()  # Use Mean Squared Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [39]:
# Training loop
num_epochs = 30
for epoch in range(num_epochs):
    t_loss = 0.0
    t_correct = 0
    t_total = 0
    for x_batch, y_batch in train_data_loader:
        # Forward pass
        # print(x_batch.shape)
        x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
        outputs = model(x_one_hot)
        loss = loss_fn(outputs, y_batch)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        t_loss += loss.item()
        # Calculate training accuracy
        _, predicted = torch.max(outputs.data, 1)
        t_total += y_batch.size(0)
        t_correct += (predicted == y_batch).sum().item()

    # Calculate average loss and accuracy for this epoch
    avg_loss = t_loss / len(train_data_loader)
    avg_acc = t_correct / t_total
    print(f'Training Accuracy : Epoch [{epoch+1}/{epoch_num}], Average Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f},train Loss: {loss.item()}')

    t_loss = 0.0
    t_correct = 0
    t_total = 0

    # Print loss for each epoch
    # print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
    # Validation loop
    model.eval()  # Set the model to evaluation mode
    val_loss = 0.0
    vt_correct = 0
    vt_total = 0
    with torch.no_grad():
        for x_batch, y_batch in test_data_loader:
            x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
            outputs = model(x_one_hot)
            # outputs = model(x_batch)
            test_loss = loss_fn(outputs, y_batch)
            val_loss += test_loss.item()
            # Calculate validation accuracy
            _, predicted = torch.max(outputs.data, 1)
            vt_total += y_batch.size(0)
            vt_correct += (predicted == y_batch).sum().item()
    # Calculate average loss and accuracy for this epoch
    avg_loss = val_loss / len(train_data_loader)
    avg_acc = vt_correct / vt_total
    print(f'Validation Accuracy : Epoch [{epoch+1}/{epoch_num}], Average Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}, Test Loss: {test_loss.item()}')
    # print(f'Epoch [{epoch+1}/{epoch_num}],Test Loss: {test_loss.item()}')


Training Accuracy : Epoch [1/30], Average Loss: 10.1157, Accuracy: 0.1250,train Loss: 3.7187302112579346
Validation Accuracy : Epoch [1/30], Average Loss: 1.0389, Accuracy: 0.1250, Test Loss: 4.229731559753418
Training Accuracy : Epoch [2/30], Average Loss: 4.2142, Accuracy: 0.1250,train Loss: 3.1197426319122314
Validation Accuracy : Epoch [2/30], Average Loss: 1.0419, Accuracy: 0.1250, Test Loss: 4.357424736022949
Training Accuracy : Epoch [3/30], Average Loss: 4.2139, Accuracy: 0.1250,train Loss: 7.700796127319336
Validation Accuracy : Epoch [3/30], Average Loss: 1.0392, Accuracy: 0.1250, Test Loss: 4.189371109008789
Training Accuracy : Epoch [4/30], Average Loss: 4.2078, Accuracy: 0.1250,train Loss: 4.551928520202637
Validation Accuracy : Epoch [4/30], Average Loss: 1.0391, Accuracy: 0.1250, Test Loss: 4.19688606262207
Training Accuracy : Epoch [5/30], Average Loss: 4.2056, Accuracy: 0.1250,train Loss: 4.165190696716309
Validation Accuracy : Epoch [5/30], Average Loss: 1.0409, Accur

In [40]:
# Evaluation loop
model.eval()  # Set the model to evaluation mode
res_gs = []
res_pred = []

with torch.no_grad():  # Deactivate autograd for evaluation to save memory and speed up computations
    for x_batch, y_batch in test_data_loader:
        x_one_hot = nn.functional.one_hot(x_batch, num_classes=5).float()
        x_one_hot = x_one_hot.to(device)
        y_batch = y_batch.to(device)
        outputs = model(x_one_hot)
        res_gs.extend(y_batch.tolist())
        res_pred.extend(outputs.squeeze().tolist())
        # outputs = model(x_batch)
    test_loss = loss_fn(outputs, y_batch)
res_gs = np.array(res_gs)
res_pred = np.array(res_pred)

# Print the results or perform any other evaluation metrics you want
print("Ground Truth Labels:", res_gs)
print("Predicted Labels:", res_pred)

Ground Truth Labels: [[ 4.]
 [ 4.]
 [ 7.]
 [ 5.]
 [ 4.]
 [ 4.]
 [ 6.]
 [ 8.]
 [ 6.]
 [ 9.]
 [ 7.]
 [ 6.]
 [ 4.]
 [ 9.]
 [ 4.]
 [ 3.]
 [ 6.]
 [ 2.]
 [ 9.]
 [ 7.]
 [ 9.]
 [ 7.]
 [ 6.]
 [ 3.]
 [ 5.]
 [ 5.]
 [ 2.]
 [ 4.]
 [ 4.]
 [ 5.]
 [ 4.]
 [ 3.]
 [ 7.]
 [ 5.]
 [ 6.]
 [ 6.]
 [ 5.]
 [ 8.]
 [ 3.]
 [ 6.]
 [ 5.]
 [ 5.]
 [ 6.]
 [ 7.]
 [ 3.]
 [ 6.]
 [ 6.]
 [ 4.]
 [ 4.]
 [ 6.]
 [ 5.]
 [ 6.]
 [ 6.]
 [ 6.]
 [ 9.]
 [ 7.]
 [ 8.]
 [10.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 3.]
 [ 7.]
 [ 4.]
 [ 8.]
 [ 4.]
 [ 6.]
 [ 6.]
 [ 3.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 4.]
 [ 0.]
 [ 3.]
 [ 4.]
 [ 5.]
 [ 5.]
 [ 7.]
 [ 6.]
 [ 6.]
 [ 4.]
 [ 4.]
 [ 2.]
 [ 4.]
 [ 7.]
 [ 7.]
 [ 6.]
 [12.]
 [ 4.]
 [ 1.]
 [ 7.]
 [ 3.]
 [ 9.]
 [ 6.]
 [ 4.]
 [ 5.]
 [ 6.]
 [ 2.]
 [ 7.]
 [ 2.]
 [ 9.]
 [ 5.]
 [ 3.]
 [ 3.]
 [ 7.]
 [ 8.]
 [ 9.]
 [ 7.]
 [ 4.]
 [ 7.]
 [ 4.]
 [ 3.]
 [ 8.]
 [ 4.]
 [ 5.]
 [ 4.]
 [ 4.]
 [ 2.]
 [ 5.]
 [ 4.]
 [ 5.]
 [ 2.]
 [ 7.]
 [ 5.]
 [ 5.]
 [ 3.]
 [ 6.]
 [ 7.]
 [ 2.]
 [ 5.]
 [ 6.]
 [ 5.]
 [ 8.]
 [ 3.]
 [ 6.]
 [ 3.]
 [ 6.]
 [ 4.]
 [ 4.]

In [41]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Calculate evaluation metrics
mae = mean_absolute_error(res_gs, res_pred)
mse = mean_squared_error(res_gs, res_pred)
r2 = r2_score(res_gs, res_pred)

# Print the evaluation metrics
print(f'Mean Absolute Error (MAE): {mae:.4f}')
print(f'Mean Squared Error (MSE): {mse:.4f}')
print(f'R-squared (R2) Score: {r2:.4f}')

Mean Absolute Error (MAE): 1.5880
Mean Squared Error (MSE): 4.1785
R-squared (R2) Score: -0.0052


In [42]:
# Save the model
model_path = 'CP_count_model_padded.pth'
torch.save(model.state_dict(), model_path)

In [43]:
import torch
import torch.nn.functional as F

# Define the mapping from nucleotides to integers
nucleotide_to_index = {'N': 0, 'A': 1, 'C': 2, 'G': 3, 'T': 4}

# Function to convert a string of nucleotides to a tensor
def string_to_tensor(s, mapping):
    # Convert the string to a list of integers
    indices = [mapping[c] for c in s]
    # Convert the list to a tensor
    tensor = torch.tensor(indices)
    # One-hot encode the tensor
    one_hot = F.one_hot(tensor, num_classes=len(mapping))
    # Add a batch dimension
    one_hot = one_hot.unsqueeze(0).float()
    return one_hot

# Test string
test_string = "NAACGTANCGCGGCGGGCCCCC"

# Convert the test string to a tensor
test_tensor = string_to_tensor(test_string, nucleotide_to_index)

# Load the model (assuming it's the same as before)
model = CpGPredictor()
state_dict = torch.load("/content/CP_count_model.pth")

# Remove the unexpected keys from the state dictionary
unexpected_keys = ['lstm.weight_ih_l1', 'lstm.weight_hh_l1', 'lstm.bias_ih_l1', 'lstm.bias_hh_l1']
for key in unexpected_keys:
    if key in state_dict:
        print("key--------",key)
        del state_dict[key]

model.load_state_dict(state_dict, strict=False)
model.eval()

# Pass the tensor through the model
with torch.no_grad():
    prediction = model(test_tensor)

# Print the prediction
print(prediction)

tensor([[5.0299]])
