# Data processing

In [109]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [110]:
!pip install Bio
from Bio import SeqIO
import torch



In [111]:
def parse_fasta(file_path):
    sequences = []
    with open(file_path, 'r') as fasta_file:
        for record in SeqIO.parse(fasta_file, "fasta"):
            sequences.append(record.seq)
    return sequences

amino_acids = 'ACDEFGHIKLMNPQRSTVWY-'

def integer_encode(sequence, max_length):
    sequence = sequence.replace('X', '-')
    encoding = [amino_acids.index(aa) for aa in sequence]
    # Pad the sequence to the specified maximum length
    if len(encoding) < max_length:
        encoding += [0] * (max_length - len(encoding))
    return torch.tensor(encoding)

In [112]:
folder_path = 'drive/MyDrive/ae_training'
file_name = f'{folder_path}/card1_1273x130.fasta'
max_len = 440

sequences = parse_fasta(file_name)
encoded_sequences = [integer_encode(seq, max_len) for seq in sequences]
data = torch.stack(encoded_sequences)
data.shape

torch.Size([1273, 440])

# Build the autoencoder

In [113]:
!pip install keras-core
import os
os.environ["KERAS_BACKEND"] = "torch"

import keras_core as keras



In [116]:
from keras_core import backend as K
from keras_core.models import Sequential, Model
from keras_core.layers import Input, LSTM, RepeatVector, Masking
from keras_core.layers import Flatten, Dense, Dropout, Lambda
from keras_core.optimizers import SGD, RMSprop, Adam
from keras_core import losses


def create_lstm_vae(input_dim,
    timesteps,
    batch_size,
    intermediate_dim,
    latent_dim,
    epsilon_std=1.):

    """
    Creates an LSTM Variational Autoencoder (VAE). Returns VAE, Encoder, Generator.

    # Arguments
        input_dim: int.
        timesteps: int, input timestep dimension.
        batch_size: int.
        intermediate_dim: int, output shape of LSTM.
        latent_dim: int, latent z-layer shape.
        epsilon_std: float, z-layer sigma.
    """
    x = Input(shape=(timesteps, input_dim,))
    masked_inputs = Masking(mask_value=0.0)(x)  # Assuming 0.0 is the mask value

    # LSTM encoding
    h = LSTM(intermediate_dim)(masked_inputs)

    # VAE Z layer
    z_mean = Dense(latent_dim)(h)
    z_log_sigma = Dense(latent_dim)(h)

    def sampling(args):
        z_mean, z_log_sigma = args
        epsilon = K.random_normal(shape=(batch_size, latent_dim),
                                  mean=0., stddev=epsilon_std)
        return z_mean + z_log_sigma * epsilon

    # note that "output_shape" isn't necessary with the TensorFlow backend
    # so you could write `Lambda(sampling)([z_mean, z_log_sigma])`
    z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_sigma])

    # decoded LSTM layer
    decoder_h = LSTM(intermediate_dim, return_sequences=True)
    decoder_mean = LSTM(input_dim, return_sequences=True)

    h_decoded = RepeatVector(timesteps)(z)
    h_decoded = decoder_h(h_decoded)

    # decoded layer
    x_decoded_mean = decoder_mean(h_decoded)

    # end-to-end autoencoder
    vae = Model(x, x_decoded_mean)

    # encoder, from inputs to latent space
    encoder = Model(x, z_mean)

    # generator, from latent space to reconstructed inputs
    decoder_input = Input(shape=(latent_dim,))

    _h_decoded = RepeatVector(timesteps)(decoder_input)
    _h_decoded = decoder_h(_h_decoded)

    _x_decoded_mean = decoder_mean(_h_decoded)
    generator = Model(decoder_input, _x_decoded_mean)

    def vae_loss(x, x_decoded_mean):
        xent_loss = losses.mse(x, x_decoded_mean)
        kl_loss = - 0.5 * K.mean(1 + z_log_sigma - K.square(z_mean) - K.exp(z_log_sigma))
        loss = xent_loss + kl_loss
        return loss

    vae.compile(optimizer='rmsprop', loss=vae_loss)

    return vae, encoder, generator

# Training