In [86]:
import numpy as np
%tensorflow_version 2.x
import tensorflow as tf
import tensorflow_datasets as tfds

Function for turning tensor string into a one-hot vector 

In [87]:
def string_to_one_hot(strg):
  vocab = {'A':'1', 'C': '2', 'G':'3', 'T':'0'}
  
  for key in vocab.keys(): #'ACTG' #'1230'
    strg = tf.strings.regex_replace(strg, key, rewrite = vocab[key])
    #strg = tf.strings.regex_replace(strg, pattern = 'A', rewrite = 1, replace_global = True)
  #strg = tf.strings.regex_replace(strg, pattern = "C", rewrite = 2, replace_global = True)
  #strg = tf.strings.regex_replace(strg, pattern = 'T', rewrite = 3, replace_global = True
  split = tf.strings.bytes_split(strg)
  labels = tf.cast(tf.strings.to_number(split), tf.uint8)
  onehot = tf.one_hot(labels, 4)
  onehot = tf.reshape(onehot, (-1,)) #e.g. [0001]
  return onehot

Load the Data-Set

In [88]:
test_data, training_data = tfds.load('genomics_ood', as_supervised = True, split = ['test[0:1000]', 'train[0:100000]'])


In [89]:

training_data = training_data.map(lambda seq, label: (string_to_one_hot(seq), tf.one_hot(label, 10)))
training_data = training_data.batch(128)
training_data = training_data.shuffle(buffer_size=128)
training_data = training_data.prefetch(20)

test_data = test_data.map(lambda seq, label: (string_to_one_hot(seq), tf.one_hot(label, 10)))
test_data = test_data.batch(128)
test_data = test_data.shuffle(buffer_size=128)
test_data = test_data.prefetch(20)


The Model


In [90]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer

class Model(Model):
  def __init__(self):
    super(Model,self).__init__()

    self.hidden_layer_1 = tf.keras.layers.Dense(units=256, activation=tf.keras.activations.sigmoid)
    self.hidden_layer_2 = tf.keras.layers.Dense(units=256, activation=tf.keras.activations.sigmoid)
    self.output_layer = tf.keras.layers.Dense(units=10, activation = tf.keras.activations.softmax)

  def call(self, input):
    input = self.hidden_layer_1(input)
    input = self.hidden_layer_2(input)
    input = self.output_layer(input)
    return input

Training

In [91]:
def train_step(model, input, target, loss_function, optimizer):
  #write a custom training loop
  #allows for automatic differentiation: automatically computes the derivative of a function
  #by repeatedly applying the chain rule
  with tf.GradientTape() as tape:
    #first: make a prediction based on model and loss
    prediction = model(input)
    #compute the loss given the prediction and the target
    loss = loss_function(prediction, target)
    #now we need the partial deriatives of the loss with respect to all the weights
    #this is where gradienttape comes in handy and allows for easy computation of all the partial derivatives
    gradients = tape.gradient(loss, model.training_variables)
  #last we apply the computed weight updates
  optimizer.apply_gradients(zip(gradients,model.trainable_variables))

  #to be able to take record of the error produced by our network we return the loss
  return loss

Testing


In [92]:
'''Testing the performance of our model. By computing the loss and '''
def test(model, test_data, loss_function):
  for (input, target) in test_data:
    prediction = model(input)

    test_losses = []
    test_accuracies = []

    computed_loss = loss_function(prediction, target)
    sample_test_accuracy =  np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    sample_test_accuracy = np.mean(sample_test_accuracy)
    test_losses.append(computed_loss.numpy())
    test_accuracies.append(np.mean(sample_test_accuracy))

  
  loss = np.mean(test_losses)
  test_accuracy = np.mean(test_accuracies)
  return loss, test_accuracy


Initialization

In [93]:
#tf.keras.backend.clear_session()

#Initialize model
model = Model()

#hyperparameters
epochs = 10
learning_rate = 0.1 

loss = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.SGD(learning_rate)

running_average_factor = 0.95

#To keep track of the processes, we use several lists
training_steps = []
train_losses= []

test_losses = []
test_accuracies = []

#test once before training the model
test_loss, test_accuracy = test(model, training_data, loss)

#how does the model do on training data before training?
train_loss = test(model, training_data, loss)
train_losses.append(train_loss)

#now start to train
for epoch in range(epochs):
  training_steps.append(epoch)
  #run through the current batch

  training_data = training_data.shuffle(buffer_size=128)
  test_data = test_data.shuffle(buffer_size=128)

  running_average = 0
  for i,(input, label) in enumerate(training_dataset):
    training_loss = test_step(model, input, label, loss, optimizer)

    #record how the loss evolves over one epoch
    running_average = running_average_factor * running_average  + (1 - running_average_factor) * train_loss 

  training_losses.append(running_average)

  #now evaluate the model performance on test set
  test_loss, test_accur = test(test_data, loss, optimizer)

  test_losses.append(test_loss)
  test_accuracies.append(test_accur)



NameError: ignored

Visualization

In [None]:
#do the visualization
#test loss and training loss
plt.figure()
line1, = plt.plot(train_losses)
line2, = plt.plot(test_losses)
plt.xlabel("Training steps")
plt.ylabel("Loss")
plt.legend((line1,line2),("training","test"))
plt.show()

In [None]:
#test accuracy
plt.figure()
plt.plot(test_accuracies)
plt.xlabel("Training steps")
plt.ylabel("Test accuracy")
plt.show()
