In [None]:
!pip install dgl==0.9.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting dgl==0.9.0
  Downloading dgl-0.9.0-cp39-cp39-manylinux1_x86_64.whl (6.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.2/6.2 MB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: dgl
Successfully installed dgl-0.9.0


# Graph representation

In [None]:
import torch
import dgl
import torch.nn as nn
import dgl.nn.pytorch as dglnn

class DiffusionNN(nn.Module):
    def __init__(self, num_feats, num_classes, num_layers, hidden_dim):
        super(DiffusionNN, self).__init__()
        
        self.conv_layers = nn.ModuleList()
        self.diffusion_layers = nn.ModuleList()
        self.fc_layers = nn.ModuleList()

        # Graph convolutional layers
        for i in range(num_layers):
            if i == 0:
                self.conv_layers.append(dglnn.GraphConv(num_feats, hidden_dim))
            else:
                self.conv_layers.append(dglnn.GraphConv(hidden_dim, hidden_dim))
        
        # Diffusion layers
        for i in range(num_layers):
            self.diffusion_layers.append(dglnn.GraphConv(hidden_dim, hidden_dim, norm='both', weight=False))

        # Fully connected layers
        for i in range(num_layers):
            if i == 0:
                self.fc_layers.append(nn.Linear(hidden_dim, hidden_dim))
            else:
                self.fc_layers.append(nn.Linear(hidden_dim, hidden_dim))
        
        self.fc_layers.append(nn.Linear(hidden_dim, num_classes))
        
    
    def forward(self, g, x=None):
        if x is None:
            h = g.ndata['feat']
        else:
            h = x

        # Graph convolutional layers
        for conv_layer in self.conv_layers:
            h = conv_layer(g, h)
            h = torch.relu(h)

        # Diffusion layers
        for diffusion_layer in self.diffusion_layers:
            h = diffusion_layer(g, h)
            h = torch.relu(h)

        # Fully connected layers
        for fc_layer in self.fc_layers:
            h = fc_layer(h)
            h = torch.relu(h)

        return h

DGL backend not selected or invalid.  Assuming PyTorch for now.


Setting the default backend to "pytorch". You can change it in the ~/.dgl/config.json file or export the DGLBACKEND environment variable.  Valid options are: pytorch, mxnet, tensorflow (all lowercase)


In [None]:
# Example code for testing on DNA sequences with G-quadruplexes
# Load DNA sequences with G-quadruplexes (assume each sequence is a string)
sequences = [
    'GGGGGAGGGGGA',
    'GGGGGAGGGAGGGG',
    'GGGGGAGGGAGGGGGGGGGGAGGGGGA'
]

# Convert sequences to graphs (assume each nucleotide is a node)
graphs = []
for sequence in sequences:
    num_nodes = len(sequence)
    g = dgl.DGLGraph()
    g.add_nodes(num_nodes)
    g.ndata['feat'] = torch.eye(num_nodes)  # one-hot encoding of nucleotides
    for i in range(num_nodes-1):
        g.add_edge(i, i+1)
    g.add_edge(num_nodes-1, 0)
    graphs.append(g)

# Create DiffusionNN model
model = DiffusionNN(num_feats=12, num_classes=2, num_layers=3, hidden_dim=64)

# Test forward pass on first graph
output = model(graphs[0])
print(output)

tensor([[0.0578, 0.0000],
        [0.0600, 0.0000],
        [0.0611, 0.0000],
        [0.0602, 0.0000],
        [0.0586, 0.0000],
        [0.0608, 0.0000],
        [0.0599, 0.0000],
        [0.0617, 0.0000],
        [0.0597, 0.0000],
        [0.0603, 0.0000],
        [0.0600, 0.0000],
        [0.0617, 0.0000]], grad_fn=<ReluBackward0>)


  assert input.numel() == input.storage().size(), "Cannot convert view " \


In [4]:
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np

In [None]:
# Define dataset class
class DNAData(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        label = self.labels[idx]
        return sequence, label

# Load DNA sequences with G-quadruplexes (assume each sequence is a string)
sequences = [
    'GGGGGAGGGGGA',
    'GGGGGAGGGAGGGG',
    'GGGGGAGGGAGGGGGGGGGGAGGGGGA',
    'AGGGGGG',
    'GGGGAGGGGGG',
    'GGGGGGGAGGGG'
]

# Assign labels to sequences (assume first 3 are positive examples and last 3 are negative examples)
labels = np.array([1, 1, 1, 0, 0, 0])

# Split data into train and validation sets
train_sequences, val_sequences, train_labels, val_labels = train_test_split(sequences, labels, test_size=0.2, random_state=42)

# Create dataset and dataloader objects for train and validation sets
train_dataset = DNAData(train_sequences, train_labels)
val_dataset = DNAData(val_sequences, val_labels)
batch_size = 2
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Create DiffusionNN model
model = DiffusionNN(num_feats=12, num_classes=2, num_layers=3, hidden_dim=64)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train model
num_epochs = 10
for epoch in range(num_epochs):
    train_loss = 0.0
    train_acc = 0.0
    val_loss = 0.0
    val_acc = 0.0

    # Train
    model.train()
    for i, (sequences, labels) in enumerate(train_loader):
        optimizer.zero_grad()

        # Convert sequences to graphs
        graphs = []
        for sequence in sequences:
            num_nodes = len(sequence)
            g = dgl.DGLGraph()
            g.add_nodes(num_nodes)
            g.ndata['feat'] = torch.eye(num_nodes)  # one-hot encoding of nucleotides
            for i in range(num_nodes-1):
                g.add_edge(i, i+1)
            g.add_edge(num_nodes-1, 0)
            graphs.append(g)

        # Forward pass
        outputs = model(graphs)

        # Calculate loss
        loss = criterion(outputs, labels)
        train_loss += loss.item() * batch_size

        # Backward pass and update weights
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        _, preds = torch.max(outputs, 1)
        train_acc += torch.sum(preds == labels)

    # Calculate average loss and accuracy for train set
    train_loss /= len(train_dataset)

AttributeError: ignored

In [None]:
# Load DNA sequences with G-quadruplexes (assume each sequence is a string)
sequences = [    'GGGGGAGGGGGA',    'GGGGGAGGGAGGGG',    'GGGGGAGGGAGGGGGGGGGGAGGGGGA']

# Convert sequences to graphs (assume each nucleotide is a node)
graphs = []
for sequence in sequences:
    num_nodes = len(sequence)
    g = dgl.DGLGraph()
    g.add_nodes(num_nodes)
    g.ndata['feat'] = torch.eye(num_nodes)  # one-hot encoding of nucleotides
    for i in range(num_nodes-1):
        g.add_edge(i, i+1)
    g.add_edge(num_nodes-1, 0)
    graphs.append(g)

# Create DiffusionNN model
model = DiffusionNN(num_feats=4, num_classes=2, num_layers=3, hidden_dim=64)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train the model
num_epochs = 10
for epoch in range(num_epochs):
    epoch_loss = 0.0
    epoch_acc = 0.0
    
    for graph in graphs:
        # Forward pass
        outputs = model(graph)
        
        # Compute loss
        labels = torch.tensor([0, 1])
        loss = criterion(outputs.unsqueeze(0), labels.unsqueeze(0))
        epoch_loss += loss.item()
        
        # Compute accuracy
        _, predicted = torch.max(outputs, 1)
        if predicted[0] == 1:
            epoch_acc += 1
            
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    # Print epoch information
    print(f'Epoch {epoch+1}/{num_epochs}: Loss: {epoch_loss/len(graphs):.4f}, Accuracy: {epoch_acc/len(graphs):.4f}')

RuntimeError: ignored

# Noise on sequence

In [None]:
import torch
import numpy as np

# Define the DNA sequence
seq = "AGCTATCGAGGGCTAGCATGCTAGCATCGACTAGCTACG"

# Create a one-hot encoded representation of the sequence
nucleotides = "ACGT"
seq_one_hot = np.zeros((len(seq), 4 ))
for i, base in enumerate(seq):
    if base in nucleotides:
        seq_one_hot[i, nucleotides.index(base)] = 1

# Add Gaussian noise to the one-hot encoded sequence
noise_std = 0.1
noise = torch.randn(seq_one_hot.shape) * noise_std
seq_noisy = seq_one_hot + noise.numpy()

# Convert the noisy sequence back to a string representation
seq_noisy_str = ""
for i in range(seq_noisy.shape[0]):
    if np.argmax(seq_noisy[i]) == 0:
        seq_noisy_str += "A"
    elif np.argmax(seq_noisy[i]) == 1:
        seq_noisy_str += "C"
    elif np.argmax(seq_noisy[i]) == 2:
        seq_noisy_str += "G"
    elif np.argmax(seq_noisy[i]) == 3:
        seq_noisy_str += "T"

print("Original sequence:", seq)
print("Noisy sequence:", seq_noisy_str)

Original sequence: AGCTATCGAGGGCTAGCATGCTAGCATCGACTAGCTACG
Noisy sequence: AGCTATCGAGGGCTAGCATGCTAGCATCGACTAGCTACG


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score
from sklearn import metrics

# Train a diffusion

In [None]:
class DiffusionProcess(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim, num_layers, timesteps):
        super(DiffusionProcess, self).__init__()
        self.timesteps = timesteps
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_dim, hidden_dim))
        for i in range(num_layers):
            self.layers.append(nn.Linear(hidden_dim, hidden_dim))
        self.layers.append(nn.Linear(hidden_dim, output_dim))
        
    def forward(self, x):
        for i in range(self.timesteps):
            noise = torch.randn_like(x) / np.sqrt(self.timesteps)
            x += noise
            for layer in self.layers:
                x = layer(x)
        return x

# Define the DNN model
class DNNGenerator(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dim, num_layers, timesteps):
        super(DNNGenerator, self).__init__()
        self.diffusion = DiffusionProcess(input_dim, output_dim, hidden_dim, num_layers, timesteps)
        
    def forward(self, x):
        return self.diffusion(x)

def train_dna(model, optimizer, criterion, train_loader, test_loader, num_epochs):
    train_losses = []
    test_losses = []
    train_accs = []
    test_accs = []
    roc_scores = []
    
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        train_total = 0
        train_correct = 0
        
        for i, (seqs, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            
            outputs = model(seqs)
            loss = criterion(outputs, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item() * seqs.size(0)
            train_total += seqs.size(0)
            train_correct += (torch.argmax(outputs, 1) == torch.argmax(labels, 1)).sum().item()
            
            if (i+1) % 1000 == 0:
                print("Epoch [{}/{}], Step [{}/{}], Train Loss: {:.4f}".format(epoch+1, num_epochs, i+1, len(train_loader), loss.item()))
        
        train_loss /= train_total
        train_acc = train_correct / train_total
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        
        model.eval()
        test_loss = 0
        test_total = 0
        test_correct = 0
        y_true = []
        y_scores = []
        
        with torch.no_grad():
            for seqs, labels in test_loader:
                outputs = model(seqs)
                loss = criterion(outputs, labels)
                
                test_loss += loss.item() * seqs.size(0)
                test_total += seqs.size(0)
                test_correct += (torch.argmax(outputs, 1) == torch.argmax(labels, 1)).sum().item()
                
                y_true += labels.cpu().numpy().tolist()
                y_scores += torch.softmax(outputs, dim=1)[:, 1].cpu().numpy().tolist()
        
        test_loss /= test_total
        test_acc = test_correct / test_total
        test_losses.append(test_loss)
        test_accs.append(test_acc)
        
        roc_score = roc_auc_score(y_true, y_scores)
        roc_scores.append(roc_score)
        
        print("Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.4f}, Test Loss: {:.4f}, Test Acc: {:.4f}, ROC AUC: {:.4f}".format(epoch+1, num_epochs, train_loss, train_acc, test_loss, test_acc, roc_score))
    
    # Plot the loss and accuracy curves
    plt.figure(figsize=(8, 6))
    plt.plot(train_losses, label="Train Loss")
    plt.plot(test_losses, label="Test Loss")
    plt.legend()
    plt.title("Loss Curves")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.show()
    
    plt.figure(figsize=(8, 6))
    plt.plot(train_accs, label="Train Acc")
    plt.plot(test_accs, label="Test Acc")
    plt.legend()
    plt.title("Accuracy Curves")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.show()
    
    # Plot the ROC curve
    fpr, tpr, thresholds = metrics.roc_curve(y_true, y_scores)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr)
    plt.plot([0, 1], [0, 1], linestyle="--")
    plt.title("ROC Curve (AUC = {:.4f})".format(roc_score))
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.show()
    
    return model, train_losses, test_losses, train_accs, test_accs, roc_scores

In [None]:
# Define the hyperparameters
input_dim = 4 # One-hot encoded sequence
output_dim = 4 # One-hot encoded sequence
hidden_dim = 512 # Hidden layer size
num_layers = 4 # Number of layers in the diffusion process
timesteps = 100 # Number of diffusion steps
batch_size = 32
learning_rate = 1e-4
num_epochs = 10

# Create a DNN generator instance
generator = DNNGenerator(input_dim, output_dim, hidden_dim, num_layers, timesteps)

# Define the optimizer and loss function
optimizer = torch.optim.Adam(generator.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

# Create a dummy dataset loader
train_data = torch.randn((1000, input_dim))
train_loader = torch.utils.data.DataLoader(train_data, batch_size=batch_size)

test_data = torch.randn((1000, input_dim))
test_loader = torch.utils.data.DataLoader(test_data, batch_size=batch_size)

# Train the generator
train_dna(generator, optimizer, criterion, train_loader, test_loader, num_epochs)

ValueError: ignored

# Generate quadruplexes

In [None]:
import random

# Set the length of the DNA sequences
seq_length = 50

# Set the number of sequences for training and testing
num_train = 1000
num_test = 200

# Generate random DNA sequences with G-quadruplexes for training
train_seqs = []
train_labels = []
for i in range(num_train):
    seq = ""
    for j in range(seq_length):
        base = random.choice(["A", "C", "G", "T"])
        seq += base
        if j >= 3 and seq[j-3:j+1] in ["GGGG", "CCCC"]:
            # Add a G-quadruplex
            seq = seq[:j-3] + seq[j-3:j+1].lower() + seq[j+1:]
    train_seqs.append(seq)
    if "g" in seq.lower():
        train_labels.append(1)  # Positive label if G-quadruplex is present
    else:
        train_labels.append(0)  # Negative label if G-quadruplex is absent

# Generate random DNA sequences with G-quadruplexes for testing
test_seqs = []
test_labels = []
for i in range(num_test):
    seq = ""
    for j in range(seq_length):
        base = random.choice(["A", "C", "G", "T"])
        seq += base
        if j >= 3 and seq[j-3:j+1] in ["GGGG", "CCCC"]:
            # Add a G-quadruplex
            seq = seq[:j-3] + seq[j-3:j+1].lower() + seq[j+1:]
    test_seqs.append(seq)
    if "g" in seq.lower():
        test_labels.append(1)  # Positive label if G-quadruplex is present
    else:
        test_labels.append(0)  # Negative label if G-quadruplex is absent

In [None]:
train_seqs

In [None]:
# Define the one-hot encoding dictionary
one_hot_dict = {
    "A": [1, 0, 0, 0],
    "C": [0, 1, 0, 0],
    "G": [0, 0, 1, 0],
    "T": [0, 0, 0, 1],
    "a": [1, 0, 0, 0],
    "c": [0, 1, 0, 0],
    "g": [0, 0, 1, 0],
    "t": [0, 0, 0, 1]
}

# Convert the DNA sequences to one-hot encoding
def dna_to_one_hot(seq):
    one_hot_seq = []
    for base in seq:
        one_hot_seq.append(one_hot_dict[base])
    return np.array(one_hot_seq)

train_seqs_one_hot = np.array([dna_to_one_hot(seq) for seq in train_seqs])
test_seqs_one_hot = np.array([dna_to_one_hot(seq) for seq in test_seqs])

In [None]:
train_seqs_one_hot

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm

# Define the diffusion neural network
class DiffusionNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DiffusionNet, self).__init__()
        self.diffusion = nn.Sequential(
            nn.Linear(4, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 4)
        )
    
    def forward(self, x):
        return self.diffusion(x).view(-1, seq_length, 4)

# Set the device to use for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the training parameters
batch_size = 32
num_epochs = 1
learning_rate = 1e-3

# Create the DataLoader for training
train_dataset = TensorDataset(torch.Tensor(train_seqs_one_hot), torch.Tensor(train_labels))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initialize the diffusion neural network and the optimizer
diffusion_net = DiffusionNet(input_size=seq_length*4, hidden_size=256, output_size=seq_length*4).to(device)
optimizer = optim.Adam(diffusion_net.parameters(), lr=learning_rate)

# Define the loss function and the accuracy function
loss_fn = nn.BCEWithLogitsLoss()
accuracy_fn = lambda output, target: (output.sigmoid().round() == target).sum().item() / target.numel()

# Train the diffusion neural network
train_losses = []
train_accs = []
for epoch in range(num_epochs):
    # Train for one epoch
    train_loss = 0
    train_acc = 0
    for batch_x, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Move the batch to the device
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        # Generate noisy inputs with diffusion
        noise = torch.randn_like(batch_x).to(device)
        noisy_batch_x = diffusion_net(noise)

        # Compute the loss and the accuracy
        loss = loss_fn(noisy_batch_x, batch_x)
        accuracy = accuracy_fn(noisy_batch_x, batch_x)

        # Backpropagate and update the parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update the training loss and accuracy
        train_loss += loss.item() * batch_x.shape[0]
        train_acc += accuracy * batch_x.shape[0]

    # Compute the average training loss and accuracy for the epoch
    train_loss /= len(train_dataset)
    train_acc /= len(train_dataset)
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # Print the training loss and accuracy for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

# Generate noisy DNA sequences with G-quadruplexes regions
num_samples = 5
with torch.no_grad():
    for i in range(num_samples):
        # Generate a random input noise
        noise = torch.randn(1, seq_length*4).to(device)

        # Generate a noisy DNA sequence with diffusion
        noisy_seq = diffusion_net(noise).sigmoid().round().cpu().numpy()[0]

        # Decode the noisy sequence to a DNA sequence
        decoded_seq = []
        for j in range(0, seq_length*4, 4):
            base = noisy_seq[j:j+4]
            if np.array_equal(base, [1, 0, 0, 0]):
                decoded_seq.append('A')
            elif np.array_equal(base, [0, 1, 0, 0]):
                decoded_seq.append('C')
            elif np.array_equal(base, [0, 0, 1, 0]):
                decoded_seq.append('G')
            elif np.array_equal(base, [0, 0, 0, 1]):
                decoded_seq.append('T')
        decoded_seq = ''.join(decoded_seq)

        # Print the noisy and decoded sequences
        print(f"Noisy sequence {i+1}: {noisy_seq}")
        print(f"Decoded sequence {i+1}: {decoded_seq}")

Epoch 1/1: 100%|██████████| 32/32 [00:03<00:00, 10.64it/s]


Epoch 1/1, Train Loss: 0.5867, Train Acc: 0.7351


RuntimeError: ignored

# Preprocess

In [7]:
def generate_DNA_seq(length):
    """Function to generate a random DNA sequence of given length"""
    bases = ['A', 'C', 'G', 'T']
    seq = ''.join(np.random.choice(bases, size=length))
    return seq

def add_gaussian_noise(seq, mean, std):
    """Function to add Gaussian noise to a given DNA sequence"""
    noise = np.random.normal(mean, std, len(seq))
    noisy_seq = seq + noise
    return noisy_seq

def generate_noisy_and_clean_seqs(num_seqs, seq_length, noise_mean, noise_std):
    """Function to generate noisy and clean DNA sequences"""
    noisy_seqs = []
    clean_seqs = []
    
    for i in range(num_seqs):
        # Generate clean DNA sequence
        clean_seq = generate_DNA_seq(seq_length)
        
        # Add Gaussian noise to clean sequence
        noisy_seq = add_gaussian_noise(clean_seq, noise_mean, noise_std)
        
        # Append to lists
        noisy_seqs.append(noisy_seq)
        clean_seqs.append(clean_seq)
    
    # Convert to numpy arrays
    noisy_seqs = np.array(noisy_seqs)
    clean_seqs = np.array(clean_seqs)
    
    return noisy_seqs, clean_seqs

In [8]:
def generate_noisy_and_clean_seqs(seq_length, num_seqs, noise_mean, noise_stddev):
    noisy_seqs = []
    clean_seqs = []
    for i in range(num_seqs):
        # Generate clean sequence
        clean_seq = np.random.choice(['A', 'T', 'C', 'G'], size=seq_length)
        clean_seq = np.array(clean_seq, dtype='|S1').astype(float)
        
        # Add Gaussian noise to clean sequence
        noise = np.random.normal(loc=noise_mean, scale=noise_stddev, size=seq_length)
        noisy_seq = clean_seq + noise
        
        # Append sequences to lists
        clean_seqs.append(clean_seq)
        noisy_seqs.append(noisy_seq)
        
    return np.array(noisy_seqs), np.array(clean_seqs)

In [None]:
import numpy as np
from scipy.ndimage.filters import gaussian_filter

# Assuming your noisy and clean sequences are stored in noisy_seqs and clean_seqs respectively
noisy_seqs_preprocessed = []
clean_seqs_preprocessed = []

# Set the standard deviation of the Gaussian noise
std_dev = 0.5

# Loop over each sequence
for noisy_seq, clean_seq in zip(noisy_seqs, clean_seqs):
    # Convert the sequences to numpy arrays
    noisy_seq = np.array(list(noisy_seq))
    clean_seq = np.array(list(clean_seq))
    
    # Apply the Gaussian noise to the noisy sequence
    noisy_seq = noisy_seq.astype(float) + np.random.normal(0, std_dev, noisy_seq.shape)
    
    # # Normalize the sequences
    # noisy_seq /= 4.
    # clean_seq /= 4.
    
    # Smooth the noisy sequence with a Gaussian filter
    noisy_seq = gaussian_filter(noisy_seq, sigma=1)
    
    # Add the preprocessed sequences to the lists
    noisy_seqs_preprocessed.append(noisy_seq)
    clean_seqs_preprocessed.append(clean_seq)

# Convert the lists to numpy arrays
noisy_seqs_preprocessed = np.array(noisy_seqs_preprocessed)
clean_seqs_preprocessed = np.array(clean_seqs_preprocessed)

# Reshape the arrays to be compatible with the U-Net input shape
noisy_seqs_preprocessed = np.expand_dims(noisy_seqs_preprocessed, axis=1)
clean_seqs_preprocessed = np.expand_dims(clean_seqs_preprocessed, axis=1)

  from scipy.ndimage.filters import gaussian_filter


# U-Net

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm

# Define the U-Net denoiser
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Encoder
        self.conv1 = nn.Conv1d(4, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        self.conv4 = nn.Conv1d(256, 512, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm1d(512)

        # Decoder
        self.up1 = nn.ConvTranspose1d(512, 256, kernel_size=2, stride=2)
        self.conv5 = nn.Conv1d(512, 256, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm1d(256)
        self.up2 = nn.ConvTranspose1d(256, 128, kernel_size=2, stride=2)
        self.conv6 = nn.Conv1d(256, 128, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm1d(128)
        self.up3 = nn.ConvTranspose1d(128, 64, kernel_size=2, stride=2)
        self.conv7 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm1d(64)
        self.conv8 = nn.Conv1d(64, 4, kernel_size=3, padding=1)

    def forward(self, x):
        # Encoder
        x1 = nn.functional.relu(self.bn1(self.conv1(x)))
        x2 = nn.functional.relu(self.bn2(self.conv2(x1)))
        x3 = nn.functional.relu(self.bn3(self.conv3(x2)))
        x4 = nn.functional.relu(self.bn4(self.conv4(x3)))

        # Decoder
        x = nn.functional.relu(self.bn5(self.conv5(torch.cat([self.up1(x4), x3], dim=1))))
        x = nn.functional.relu(self.bn6(self.conv6(torch.cat([self.up2(x), x2], dim=1))))
        x = nn.functional.relu(self.bn7(self.conv7(torch.cat([self.up3(x), x1], dim=1))))
        x = torch.sigmoid(self.conv8(x))
        return x

In [None]:
# Set the device to use for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the training parameters
batch_size = 32
num_epochs = 10
learning_rate = 1e-3

# Create the DataLoader for training
train_dataset = TensorDataset(torch.Tensor(noisy_seqs_preprocessed), torch.Tensor(clean_seqs_preprocessed))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Initialize the U-Net denoiser and the optimizer
unet = UNet().to(device)
optimizer = optim.Adam(unet.parameters(), lr=learning_rate)

# Define the loss function
loss_fn = nn.MSELoss()

# Train the U-Net denoiser
train_losses = []
for epoch in range(num_epochs):
    # Train for one epoch
    train_loss = 0
    for batch_x, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Move the batch to the device
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device)

        # Forward pass
        y_pred = unet(batch_x)

        # Compute the loss
        loss = loss_fn(y_pred, batch_y)

        # Backpropagate and update the parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update the training loss
        train_loss += loss.item() * batch_x.shape[0]

    # Compute the average training loss for the epoch
    train_loss /= len(train_dataset)
    train_losses.append(train_loss)
    
    # Print the training loss for the epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}")

Epoch 1/10:   0%|          | 0/32 [00:00<?, ?it/s]


RuntimeError: ignored

# Generating 

In [1]:
def generate_sequences(num_seqs, seq_len):
    """Generate random DNA sequences with G-quadruplex regions."""
    seqs = []
    for i in range(num_seqs):
        seq = ''
        while len(seq) < seq_len:
            # Generate random nucleotide
            nuc = np.random.choice(['A', 'C', 'G', 'T'])
            seq += nuc
            # Generate G-quadruplex region
            if len(seq) % 4 == 0 and len(seq) < seq_len:
                seq += 'GGGG'
        seqs.append(seq)
    return seqs

def preprocess_sequences(seqs):
    """Preprocess DNA sequences by one-hot encoding."""
    # Define nucleotide to integer mapping
    nuc_map = {'A': 0, 'C': 1, 'G': 2, 'T': 3}
    # One-hot encode sequences
    encoded_seqs = np.zeros((len(seqs), len(seqs[0]), 4))
    for i, seq in enumerate(seqs):
        for j, nuc in enumerate(seq):
            encoded_seqs[i, j, nuc_map[nuc]] = 1
    return encoded_seqs

In [2]:
def add_gaussian_noise(seq, noise_level):
    """
    Add Gaussian noise to a DNA sequence.

    Args:
    - seq (str): DNA sequence.
    - noise_level (float): Standard deviation of the Gaussian noise.

    Returns:
    - noisy_seq (str): Noisy DNA sequence.
    """
    # Convert DNA sequence to array of integers
    seq_int = np.zeros(len(seq), dtype=int)
    for i, char in enumerate(seq):
        if char == 'A':
            seq_int[i] = 0
        elif char == 'C':
            seq_int[i] = 1
        elif char == 'G':
            seq_int[i] = 2
        elif char == 'T':
            seq_int[i] = 3

    # Add Gaussian noise
    noise = np.random.normal(scale=noise_level, size=len(seq_int))
    seq_int_noisy = np.clip(seq_int + noise, 0, 3)

    # Convert back to DNA sequence
    noisy_seq = ''
    for i in seq_int_noisy:
        if i == 0:
            noisy_seq += 'A'
        elif i == 1:
            noisy_seq += 'C'
        elif i == 2:
            noisy_seq += 'G'
        elif i == 3:
            noisy_seq += 'T'

    return noisy_seq

In [5]:
num_seqs = 1000
seq_len = 200
noise_std = 0.1

# Generate sequences with G-quadruplex regions
clean_seqs = generate_sequences(num_seqs, seq_len)

# Add Gaussian noise to sequences
noisy_seqs = add_gaussian_noise(clean_seqs, noise_std)

# Preprocess sequences by one-hot encoding
clean_seqs_preprocessed = preprocess_sequences(clean_seqs)
noisy_seqs_preprocessed = preprocess_sequences(noisy_seqs)

In [None]:
noisy_seqs_preprocessed

In [None]:
# Load the trained U-Net denoiser
denoiser = UNet()

checkpoint = torch.load('unet_checkpoint.pth')
denoiser.load_state_dict(checkpoint['model_state_dict'])
denoiser.eval()

# Generate noisy DNA sequences with G-quadruplex regions using diffusion neural network
noisy_seqs = generate_sequences(diffusion_nn, num_seqs=10, seq_length=200)

# Preprocess noisy DNA sequences for input to U-Net denoiser
noisy_seqs_preprocessed = preprocess_sequences(noisy_seqs)

# Denoise the noisy sequences using U-Net denoiser
with torch.no_grad():
    denoised_seqs_preprocessed = denoiser(torch.from_numpy(noisy_seqs_preprocessed).float())
    
# Postprocess denoised sequences to obtain final DNA sequences with G-quadruplex regions
denoised_seqs = postprocess_sequences(denoised_seqs_preprocessed)

# Print the final denoised DNA sequences
print(denoised_seqs)

# Transformer as denoiser

In [None]:
class TransformerDenoiser(nn.Module):
    def __init__(self, input_dim=4, max_length=1000, d_model=256, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=512, dropout=0.1):
        super(TransformerDenoiser, self).__init__()
        self.max_length = max_length
        self.embedding = nn.Embedding(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_length=max_length)
        self.transformer = nn.Transformer(d_model=d_model, nhead=nhead, num_encoder_layers=num_encoder_layers,
                                          num_decoder_layers=num_decoder_layers, dim_feedforward=dim_feedforward,
                                          dropout=dropout)
        self.fc = nn.Linear(d_model, input_dim)

    def forward(self, x):
        # x is of shape (batch_size, seq_length)
        x = self.embedding(x) * np.sqrt(self.max_length)
        x = self.pos_encoder(x)
        x = self.transformer(x, x)
        x = self.fc(x)
        return x
        

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length=1000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        # Compute the positional encodings in advance
        pe = torch.zeros(max_length, d_model)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [None]:
# Define dataset class
class DNASequenceDataset(Dataset):
    def __init__(self, noisy_seqs_preprocessed, clean_seqs_preprocessed):
        self.noisy_seqs_preprocessed = noisy_seqs_preprocessed
        self.clean_seqs_preprocessed = clean_seqs_preprocessed
        
    def __len__(self):
        return len(self.noisy_seqs_preprocessed)
    
    def __getitem__(self, idx):
        noisy_seq = torch.tensor(self.noisy_seqs_preprocessed[idx], dtype=torch.float32)
        clean_seq = torch.tensor(self.clean_seqs_preprocessed[idx], dtype=torch.float32)
        return noisy_seq, clean_seq

# Define training parameters
batch_size = 32
learning_rate = 1e-4
num_epochs = 10

# Generate preprocessed noisy and clean sequences
noisy_seqs_preprocessed, clean_seqs_preprocessed = preprocess_sequences(generate_sequences(1000))

# Create dataset and dataloader
dataset = DNASequenceDataset(noisy_seqs_preprocessed, clean_seqs_preprocessed)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialize model and optimizer
model = TransformerDenoiser().cuda()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Train the Transformer denoiser
train_losses = []
for epoch in range(num_epochs):
    epoch_loss = 0
    for i, (noisy_seq, clean_seq) in enumerate(dataloader):
        # Move tensors to GPU
        noisy_seq = noisy_seq.cuda()
        clean_seq = clean_seq.cuda()

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output_seq = model(noisy_seq)

        # Compute loss
        loss = nn.MSELoss()(output_seq, clean_seq)

        # Backward pass
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    train_losses.append(epoch_loss / len(dataloader))

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_losses[-1]}")

In [None]:
model = TransformerDenoiser()
model.load_state_dict(torch.load('transformer_denoiser.pt'))

# Generate some DNA sequences using the diffusion neural network
noisy_seqs = generate_sequences(10)

# Preprocess the DNA sequences by converting them to one-hot encoding
noisy_seqs_preprocessed = preprocess_sequences(noisy_seqs)

# Convert the preprocessed sequences to PyTorch tensors and pass them through the denoiser
with torch.no_grad():
    noisy_seqs_tensor = torch.tensor(noisy_seqs_preprocessed).long()
    denoised_seqs_tensor = model(noisy_seqs_tensor)

# Convert the denoised sequences back to numpy arrays
denoised_seqs_preprocessed = denoised_seqs_tensor.cpu().numpy()
denoised_seqs = postprocess_sequences(denoised_seqs_preprocessed)

# Print the denoised DNA sequences
for i in range(len(denoised_seqs)):
    print(f"Noisy sequence: {noisy_seqs[i]}")
    print(f"Denoised sequence: {denoised_seqs[i]}")