<a href="https://colab.research.google.com/github/niklasschemmer/homework_week3/blob/main/week3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [63]:
import tensorflow_datasets as tfds
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [64]:
train_dataset, test_dataset = tfds.load('genomics_ood', split=['train', 'test'], as_supervised=True)

def convert_genomic_string_to_number(sequence):
  vocab = {'A':'1', 'C':'2', 'G':'3', 'T':'0'}
  for key in vocab.keys():
    sequence = tf.strings.regex_replace(sequence, key, vocab[key])
  split = tf.strings.bytes_split(sequence)
  labels = tf.cast(tf.strings.to_number(split), tf.uint8)
  onehot = tf.one_hot(labels, 4)
  onehot = tf.reshape(onehot, (-1,))
  return onehot

def prepare_genomics_data_training(genomics_ood):
  genomics_ood = genomics_ood.take(100000)
  #flatten the images into vectors
  genomics_ood = genomics_ood.map(lambda sequence, target: (convert_genomic_string_to_number(sequence), target))
  genomics_ood = genomics_ood.map(lambda sequence, target: (sequence, tf.one_hot(target, depth=10)))
  #cache this progress in memory, as there is no need to redo it; it is deterministic after all
  genomics_ood = genomics_ood.cache()
  #shuffle, batch, prefetch
  genomics_ood = genomics_ood.shuffle(100)
  genomics_ood = genomics_ood.batch(128)
  genomics_ood = genomics_ood.prefetch(50)
  #return preprocessed dataset
  return genomics_ood

def prepare_genomics_data_test(genomics_ood):
  genomics_ood = genomics_ood.take(1000)
  #flatten the images into vectors
  genomics_ood = genomics_ood.map(lambda sequence, target: (convert_genomic_string_to_number(sequence), target))
  genomics_ood = genomics_ood.map(lambda sequence, target: (sequence, tf.one_hot(target, depth=10)))
  #cache this progress in memory, as there is no need to redo it; it is deterministic after all
  genomics_ood = genomics_ood.cache()
  #shuffle, batch, prefetch
  genomics_ood = genomics_ood.shuffle(100)
  genomics_ood = genomics_ood.batch(128)
  genomics_ood = genomics_ood.prefetch(50)
  #return preprocessed dataset
  return genomics_ood


In [65]:
train_dataset = train_dataset.apply(prepare_genomics_data_training)
test_dataset = test_dataset.apply(prepare_genomics_data_test)

In [66]:
class CustomDense(tf.keras.layers.Layer):

    def __init__(self, activation_funtion, units):
        super(CustomDense, self).__init__()
        self.units = units
        self.activation = activation_funtion

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                               initializer='random_normal',
                               trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                               initializer='random_normal',
                               trainable=True)

    def call(self, inputs): 
        x = tf.matmul(inputs, self.w) + self.b
        x = self.activation(x)
        return x

In [67]:
class MyModel(tf.keras.Model):
    
    def __init__(self):
        super(MyModel, self).__init__()
        self.dense1 = CustomDense(tf.nn.sigmoid, 256)
        self.dense2 = CustomDense(tf.nn.sigmoid, 256)
        self.out = CustomDense(tf.nn.softmax, 10)

    @tf.function
    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.out(x)
        return x

In [68]:
def train_step(model, input, target, loss_function, optimizer):
  # loss_object and optimizer_object are instances of respective tensorflow classes
  with tf.GradientTape() as tape:
    prediction = model(input)
    loss = loss_function(target, prediction)
    gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

def test(model, test_data, loss_function):
  # test over complete test data

  test_accuracy_aggregator = []
  test_loss_aggregator = []

  for (input, target) in test_data:
    prediction = model(input)
    sample_test_loss = loss_function(target, prediction)
    sample_test_accuracy =  np.argmax(target, axis=1) == np.argmax(prediction, axis=1)
    sample_test_accuracy = np.mean(sample_test_accuracy)
    test_loss_aggregator.append(sample_test_loss.numpy())
    test_accuracy_aggregator.append(np.mean(sample_test_accuracy))

  test_loss = tf.reduce_mean(test_loss_aggregator)
  test_accuracy = tf.reduce_mean(test_accuracy_aggregator)

  return test_loss, test_accuracy

In [None]:
tf.keras.backend.clear_session()

### Hyperparameters
num_epochs = 10
learning_rate = 0.1

# Initialize the model.
model = MyModel()
# Initialize the loss: categorical cross entropy. Check out 'tf.keras.losses'.
cross_entropy_loss = tf.keras.losses.CategoricalCrossentropy()
# Initialize the optimizer: SGD with default parameters. Check out 'tf.keras.optimizers'
optimizer = tf.keras.optimizers.SGD(learning_rate)

# Initialize lists for later visualization.
train_losses = []

test_losses = []
test_accuracies = []

#testing once before we begin
test_loss, test_accuracy = test(model, test_dataset, cross_entropy_loss)
test_losses.append(test_loss)
test_accuracies.append(test_accuracy)

#check how model performs on train data once before we begin
train_loss, _ = test(model, train_dataset, cross_entropy_loss)
train_losses.append(train_loss)

# We train for num_epochs epochs.
for epoch in range(num_epochs):
    print(f'Epoch: {str(epoch)} starting with accuracy {test_accuracies[-1]}')

    #training (and checking in with training)
    epoch_loss_agg = []
    for input,target in train_dataset:
        train_loss = train_step(model, input, target, cross_entropy_loss, optimizer)
        epoch_loss_agg.append(train_loss)
    
    #track training loss
    train_losses.append(tf.reduce_mean(epoch_loss_agg))

    #testing, so we can track accuracy and test loss
    test_loss, test_accuracy = test(model, test_dataset, cross_entropy_loss)
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

In [None]:
plt.figure()
line1, = plt.plot(train_losses)
line2, = plt.plot(test_losses)
line3, = plt.plot(test_accuracies)
plt.xlabel("Training steps")
plt.ylabel("Loss/Accuracy")
plt.legend((line1,line2, line3),("training loss","test loss", "test accuracy"))
plt.show()