In [44]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
import numpy as np

import pandas as pd

In [12]:
lab2onehot =  lambda seq: torch.eye(len(alphabet_dict))[seq]
seq2lab = lambda seq: torch.tensor([alphabet_dict[i] for i in seq])

In [67]:
def label(seqs, alphabet_dict):
    assert any(seqs), "empty sequence input"
    assert any([len(seq) != len(seq[0]) for seq in seqs])
    encoded_seqs = torch.ones((len(seqs), len(seqs[0])))
    for i,seq in enumerate(seqs):
        encoded_seq = torch.tensor([alphabet_dict[a] for a in seq])
        encoded_seqs[i,:] = encoded_seq
    return encoded_seqs

def onehot(seqs, alphabet_dict):
    assert any(seqs), "empty sequence input"
    assert any([len(seq) != len(seq[0]) for seq in seqs])
    encoded_seqs = torch.ones((len(seqs), len(seqs[0]), len(alphabet_dict)))
    for i,seq in enumerate(seqs):
        encoded_seq = seq2lab(seq)
        encoded_seq_onehot = lab2onehot(encoded_seq)
        encoded_seqs[i,:,:] = encoded_seq_onehot
    return encoded_seqs

def calculate_prob_matrix(seqs, alphabet_dict):    
    onehot_vect = onehot(seqs, alphabet_dict)
    return (onehot_vect.sum(dim=0).T / onehot_vect.sum(dim=[0,2])).T

In [113]:
class Autoencoder(nn.Module):
    def __init__(self, num_channels):
        super(Autoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(num_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(64, 32, kernel_size=3, stride=1, padding=1, output_padding=0),
            nn.ReLU(),
            nn.ConvTranspose1d(32, num_channels, kernel_size=3, stride=1, padding=1, output_padding=0),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [114]:
df_aligned = pd.read_csv("C:/Users/khoah/PhD_Documents/SPLASH/compactor_classified_small_aligned.csv")
alphabet_dict = {"p": 0, "-": 1, "A": 2, "C": 3, "G": 4, "T": 5}

In [115]:
msas = df_aligned.groupby("anchor_index").agg(list).aligned_compactor.to_list()
msa_prob_encoded = [calculate_prob_matrix(msa, alphabet_dict) for msa in msas]
padded_sequences = pad_sequence(msa_prob_encoded, batch_first=True, padding_value=0)
input_seqs = torch.swapaxes(padded_sequences, 1,2)

In [116]:
# Instantiate the model
model = Autoencoder(6)

# Choose a loss function (e.g., Mean Squared Error) and an optimizer (e.g., Adam)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    # Forward pass
    outputs = model(input_seqs)
    print(outputs.shape)
    loss = criterion(outputs, input_seqs)

    # Backward pass and optimization
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Print progress
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

torch.Size([2207, 6, 126])
Epoch [1/10], Loss: 0.2222
torch.Size([2207, 6, 126])
Epoch [2/10], Loss: 0.2172
torch.Size([2207, 6, 126])
Epoch [3/10], Loss: 0.2121
torch.Size([2207, 6, 126])
Epoch [4/10], Loss: 0.2072
torch.Size([2207, 6, 126])
Epoch [5/10], Loss: 0.2021
torch.Size([2207, 6, 126])
Epoch [6/10], Loss: 0.1968
torch.Size([2207, 6, 126])
Epoch [7/10], Loss: 0.1910
torch.Size([2207, 6, 126])
Epoch [8/10], Loss: 0.1845
torch.Size([2207, 6, 126])
Epoch [9/10], Loss: 0.1774
torch.Size([2207, 6, 126])
Epoch [10/10], Loss: 0.1695
