<a href="https://colab.research.google.com/github/makrez/BioinformaticsTools/blob/master/autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Autoencoders for the classification of DNA sequences

### Loading libraries

In [4]:
from numpy import array
from numpy import argmax
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
import re
!pip install biopython
#import biopython

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Creating functions

In [None]:
class hot_dna:
 def __init__(self,fasta):
   
  #check for and grab sequence name
  if re.search(">",fasta):
   name = re.split("\n",fasta)[0]
   sequence = re.split("\n",fasta)[1]
  else :
   name = 'unknown_sequence'
   sequence = fasta
  
  #get sequence into an array
  seq_array = array(list(sequence))
    
  #integer encode the sequence
  label_encoder = LabelEncoder()
  integer_encoded_seq = label_encoder.fit_transform(seq_array)
    
  #one hot the sequence
  onehot_encoder = OneHotEncoder(sparse=False)
  #reshape because that's what OneHotEncoder likes
  integer_encoded_seq = integer_encoded_seq.reshape(len(integer_encoded_seq), 1)
  onehot_encoded_seq = onehot_encoder.fit_transform(integer_encoded_seq)
  
  #add the attributes to self 
  self.name = name
  self.sequence = fasta
  self.integer = integer_encoded_seq
  self.onehot = onehot_encoded_seq


# Flatten a list
def flatten(l):
    return [item for sublist in l for item in sublist]


# Autoencoder

class AE(tf.keras.Model):
  def __init__(self, in_size, n_code, noise_rate=0):
    super().__init__()

    self.data_size = list(in_size)  # shape of data sample
    self.flat_data_size = np.prod(self.data_size)
    
    self.noise_rate = min(0.99, max(0, noise_rate)) # noise rate for denoising AE
    self.denoising = self.noise_rate != 0

    self.x_d = None  # variable to keep the input data
    self.xn_d = None # input data with added noise (for DAE)

    self.x_d_val = None  # validation dataset 
    self.c_d_val = None  # class labels for validation dataset

    self.history = {} # training history
    self.sample_history = {}  # history of validation sample evolution in latent space and reconstruction
    self.weights_history = {} # history of model weights (joint model can't be saved at the moment)

    self.out = display(IPython.display.Pretty(''), display_id=True)

    self.last_n_ep = 0  # number of epochs of last fit run

    self.n_code = n_code # number of latent dimensions

    self.encoder = None
    self.decoder = None
    
    self.create()

  def create(self):
    """
    Here the model is built
    """

    # encoder model
    self.encoder = tf.keras.Sequential(
        [
         Input(shape=self.data_size),
         Flatten(),
         Dense(128, activation='relu', kernel_initializer='he_normal', name='e_l1'),
         Dense(self.n_code, activation='sigmoid', kernel_initializer='he_normal', name='e_l2'),
        ])
    
    #decoder model
    self.decoder = tf.keras.Sequential(
        [
         Input(shape=self.n_code),
         Dense(128, activation='relu', kernel_initializer='he_normal', name='d_l1'),
         Dense(self.flat_data_size, activation='sigmoid', kernel_initializer='he_normal', name='d_l2'),
         Reshape(target_shape=self.data_size,),
        ])
         

    # build the model
    self.compile(
          optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
          loss=tf.keras.losses.MeanSquaredError(),
      )
      
  def encode(self, x):
    z = self.encoder(x)
    return z

  def decode(self, z):
    y = self.decoder(z)
    return y

  def call(self, x):
    """forward pass"""
    z = self.encoder(x)
    y = self.decoder(z)
    return y

  class EvalNSamples(tf.keras.callbacks.Callback):
    """
    Callback class for evaluating ans saving latent state and prediciton for validation samples
    """
    def __init__(self, ae, n=32):
      super().__init__()
      self.ae = ae
      self.n_sampl = n

      self.ims_smpl = None
      self.lbls_smpl = None

      self.get_uniform_subsample()

    def get_uniform_subsample(self):
      """
      Get self.n_sampl elements for each of the classes
      for latent space evolution
      """
      ims = []
      lbls = []
      for class_idx in range(np.max(self.ae.c_d_val)+1):
        map_d = self.ae.c_d_val == class_idx
        ims_d = self.ae.x_d_val[map_d]

        smpl_idx = np.random.choice(len(ims_d), self.n_sampl)
        ims_d_smpl = ims_d[smpl_idx]
        
        ims.append(ims_d_smpl)
        lbls.append([class_idx]*self.n_sampl)

      self.ims_smpl = np.concatenate(ims)
      self.lbls_smpl = np.concatenate(lbls)

    def on_epoch_end(self, epoch, logs=None):
        samples = self.ae.x_d_val[:self.n_sampl]
        labels = self.ae.c_d_val[:self.n_sampl]
        res = {'x': samples, 'l': labels}
        if self.ae.denoising:
          samples = self.ae.add_noise(samples)
          res['xn'] = samples

        res['y'] = self.ae.predict(samples)
        res['z'] = self.ae.encoder(samples).numpy()

        res['l_unif'] = self.lbls_smpl
        res['z_unif'] = self.ae.encoder(self.ims_smpl).numpy()

        self.ae.sample_history[epoch] = res   
        #keys = list(logs.keys())
        #print("End epoch {} of training; got log keys: {}".format(epoch, keys))

  class SaveAE(tf.keras.callbacks.Callback):
      """
      Callback class for saving model weights along training
      """
      def __init__(self, ae):
        super().__init__()
        self.ae = ae

      def on_epoch_end(self, epoch, logs=None):
        weights_encoder = self.ae.encoder.get_weights()
        weights_decoder = self.ae.decoder.get_weights()

        self.ae.weights_history[epoch] = {
            'w_encoder': weights_encoder,
            'w_decoder': weights_decoder,
        }   

  def _fit(self, x, y=None, epochs=None, batch_size=None,
           validation_data=None, callbacks=None):
    """
    Here actual model fitting is performed.
    Can be reimplemented in inherited class for custom training loop (needed for VAE)
    """
    if y is None:
          return super().fit(x=x,
                       epochs=epochs,
                       validation_data=validation_data,
                       callbacks=callbacks)
  
    else:
      return super().fit(x=x, y=y,
                        epochs=epochs, batch_size=batch_size, 
                        validation_data=validation_data,
                        callbacks=callbacks)
  
  def fit(self, training_data, n_epochs, 
          validation_data=None, lr=None, 
          batch_size = 64,
          epoch_callback=None,
          callbacks=None
          ):
    """
    Interface for model training
    Incapsulates all the callbacks, adding noise to training data etc
    """

    t0 = timer()

    train_dataset = tf.data.Dataset.from_tensor_slices(training_data)
    train_dataset = train_dataset.map(lambda x: (x, self.add_noise(x)))
    train_dataset = train_dataset.shuffle(60000)
    train_dataset = train_dataset.batch(batch_size)
    train_dataset = train_dataset.prefetch(5)

    self.x_d = training_data
    self.x_d_val, self.c_d_val = validation_data
    
    #self.xn_d = self.add_noise(self.x_d)

    log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    callbacks = callbacks or []
    callbacks = [AE.EvalNSamples(self), AE.SaveAE(self), tensorboard_callback]
    # if save_dir:
    #   callbacks += [save_callback]
    
    if lr is not None:
      self.optimizer.lr.assign(lr)

    self.history = self._fit(train_dataset,
                                  epochs=n_epochs, 
                                  validation_data=(self.x_d_val, 
                                                   self.x_d_val),
                                  callbacks=callbacks)
                
    self.last_n_ep = n_epochs
    t1 = timer()
    self.print(f'fit time {t1-t0:.0f} sec')

  def add_noise(self, x):
    """
    Adds Salt&Pepper nois to imput data.
    Currently noisy samples are generated only once, not for each epoch.
    """
    
    if self.denoising:
      sh = x.shape 
      
      noise_mask = np.random.binomial(n=1, p=self.noise_rate, size=sh)
      sp_noise = np.random.binomial(n=1, p=0.5, size=sh)

      x = x * (1-noise_mask) + sp_noise * noise_mask

      self.sp = sp_noise

    return x

  def print(self, msg):
    self.out.update(IPython.display.Pretty(msg))

  def summary(self):
    self.encoder.summary()
    self.decoder.summary()

  def plot_hist(self):
    """
    plot training loss
    """
    hist = self.history.history
    if not hist:
      self.print('run `fit` first to train the model')
      return

    loss = hist['loss']
    v_loss = hist['val_loss']
    eps = np.arange(len(loss))
    plt.semilogy(eps, loss, label='training');
    if 'val_loss' in hist:
      plt.semilogy(eps, v_loss, label='validation');
    plt.legend()
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.show()
    plt.close()

  def plot_samples(self, stride=5, fig_scale=1):
    """
    Plots input, noisy samples (for DAE) and reconstruction.
    Each `stride`-th epoch
    """

    hist = self.sample_history
    for epoch_idx, hist_el in hist.items():
      if epoch_idx % stride != 0 and epoch_idx != np.max(list(hist.keys())):
        continue
        
      samples = []
      for k, els in hist_el.items():
        if k not in ['x', 'xn', 'y']:
          continue
        samples.append(els)

      ny = len(samples)
      nx = len(samples[0])
      plt.figure(figsize=(fig_scale*nx, fig_scale*ny))
      m = mosaic(samples)
      plt.title(f'after epoch {int(epoch_idx)}')
      plt.imshow(m, cmap='gray', vmin=0, vmax=1)
      plt.tight_layout(0.1, 0, 0)
      plt.show()
      plt.close()

  def run_on_trained(self, run_fn, ep=None):
    """
    Helper funcrion to excecute any function on model in state after `ep` training epoch
    """
    ep = ep if (ep is not None) else (self.last_n_ep-1)
    self.encoder.set_weights(self.weights_history[ep]['w_encoder'])
    self.decoder.set_weights(self.weights_history[ep]['w_decoder'])
    
    run_fn(self)

  def run_on_all_training_history(self, run_fn, n_ep=None):
    """
    Helper funcrion to excecute any function on model state after each of the training epochs
    """
    n_ep = n_ep if (n_ep is not None) else (self.last_n_ep)
    for ep in range(n_ep):
      self.print(f'running on epoch {ep+1}/{n_ep}...')
      self.run_on_trained(run_fn, ep)
    self.print(f'done')

### Read data