# Character-level CNN

In [0]:
 %tensorflow_version 1.x

import tensorflow as tf
print(tf.__version__)

In [0]:
from math import ceil

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.utils import shuffle
import os
import shutil
import time

In [0]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from IPython.display import clear_output
import collections
%matplotlib inline

In [0]:
from google.colab import drive
drive.mount('/content/drive')

# Helper Functions

### Create Dataset Function

In [0]:
def get_num_classes(data_path, sep='\t'):
    return len(pd.read_csv(data_path, header=None, usecols=[0], sep=sep)[0].unique())

# Dataset should be tab separated; left hand column is classes starting with 0
def create_dataset(data_path, alphabet="""abcdefghijklmnopqrstuvwxyz0123456789,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]{}""",
                   max_length=1014, batch_size=128, is_training=True, sep='\t'):

    # Load data with pandas
    data = pd.read_csv(data_path, header=None, sep=sep)
    num_iters = ceil(data.shape[0] / batch_size)

    # Shuffle if it's the training set
    if is_training:
        data = shuffle(data, random_state=42)
    num_columns = data.shape[1]

    # If there are more than two columns, add them to col 1 and drop rest
    for idx in range(2, num_columns):
        data[1] += ' ' + data[idx]
    data = data.drop([idx for idx in range(2, num_columns)], axis=1).values
    alphabet = list(alphabet)
    identity_mat = np.identity(len(alphabet))

    # Returns matrix of one-hot column vectors for each row, and a class label integer
    def generator():
        for row in data:
            label, text = row
            text = np.array([identity_mat[alphabet.index(i)] for i in list(str(text)) if i in alphabet], dtype=np.float32)
            if len(text) > max_length:
                text = text[:max_length]
            elif 0 < len(text) < max_length:
                text = np.concatenate((text, np.zeros((max_length - len(text), len(alphabet)), dtype=np.float32)))
            elif len(text) == 0:
                text = np.zeros((max_length, len(alphabet)), dtype=np.float32)
            yield text.T, label

    # Return dataset containing generator, and number of iterations
    # Here we specify output types and shapes
    return tf.data.Dataset.from_generator(generator, (tf.float32, tf.int32),
                                          ((len(alphabet), max_length), (None))).batch(batch_size), num_iters

## Plotting Functions

In [0]:
# Plot that updates each time it's fed new data
# Accepts a dictionary of lists; keys will be used as legend
# Update the dictionary, and call again to replot
def live_plot(data_dict, figsize=(7,5), title='', xlabel=''):
    clear_output(wait=True)
    plt.figure(figsize=figsize)
    for label,data in data_dict.items():
        plt.plot(data, label=label)
    plt.title(title)
    plt.grid(True)
    plt.xlabel(xlabel)
    plt.legend(loc='center left') # the plot evolves to the right
    plt.show();

In [0]:
# To test uncomment the following:

# fizz = collections.defaultdict(list)
# for i in range(10):
#     fizz['foo'].append(np.random.random())
#     fizz['bar'].append(np.random.random())
#     fizz['baz'].append(np.random.random())
#     live_plot(fizz)

In [0]:
# Same as above, but two side-by-side plots
def live_plot_double(dict_A, dict_B, xlabel_A='iteration', xlabel_B='epoch', figsize=(13,5)):
    clear_output(wait=True)

    plt.figure(figsize=(13,5))
    plt.subplot(1, 2, 1)
    #plt.figure(figsize=(7,5))
    for label,data in dict_A.items():
        plt.plot(data, label=label)
    #plt.title(title)
    plt.grid(True)
    plt.xlabel(xlabel_A)
    plt.legend(loc='center left') # the plot evolves to the right
    
    plt.subplot(1, 2, 2)
    #plt.figure(figsize=(7,5))
    for label,data in dict_B.items():
        plt.plot(data, label=label)
    #plt.title(title)
    plt.grid(True)
    plt.xlabel(xlabel_B)
    plt.legend(loc='center left') # the plot evolves to the right
    
    plt.tight_layout()
    plt.show()

In [0]:
# fizz = collections.defaultdict(list)
# buzz = collections.defaultdict(list)
# for i in range(10):
#     fizz['foo'].append(np.random.random())
#     fizz['bar'].append(np.random.random())
#     buzz['baz'].append(np.random.random())
#     buzz['foobar'].append(np.random.random())
#     live_plot_double(fizz, buzz)

# The Neural Network

In [0]:
class Char_level_cnn(object):
    def __init__(self, batch_size=128, num_classes=14, feature="small", 
                 kernel_size=[7, 7, 3, 3, 3, 3], padding="VALID"):
        super(Char_level_cnn, self).__init__()
        self.batch_size = batch_size
        self.num_classes = num_classes
        if feature == "small":
            self.num_filters = 256
            self.stddev_initialization = 0.05
            self.num_fully_connected_features = 1024
        else:
            self.num_filters = 1024
            self.stddev_initialization = 0.02
            self.num_fully_connected_features = 2048
        self.kernel_size = kernel_size
        self.padding = padding

    def forward(self, input, keep_prob):

        output = tf.expand_dims(input, -1)
        output = self._create_conv(output, [output.get_shape().as_list()[1], self.kernel_size[0], 1, self.num_filters],
                                   "conv1",
                                   3)
        output = self._create_conv(output, [1, self.kernel_size[1], self.num_filters, self.num_filters], "conv2", 3)
        output = self._create_conv(output, [1, self.kernel_size[2], self.num_filters, self.num_filters], "conv3")
        output = self._create_conv(output, [1, self.kernel_size[3], self.num_filters, self.num_filters], "conv4")
        output = self._create_conv(output, [1, self.kernel_size[4], self.num_filters, self.num_filters], "conv5")
        output = self._create_conv(output, [1, self.kernel_size[5], self.num_filters, self.num_filters], "conv6", 3)

        new_feature_size = int(self.num_filters * ((input.get_shape().as_list()[2] - 96) / 27))
        flatten = tf.reshape(output, [-1, new_feature_size])

        output = self._create_fc(flatten, [new_feature_size, self.num_fully_connected_features], "fc1", keep_prob)
        output = self._create_fc(output, [self.num_fully_connected_features, self.num_fully_connected_features], "fc2",
                                 keep_prob)
        output = self._create_fc(output, [self.num_fully_connected_features, self.num_classes], "fc3")

        return output

    def _create_conv(self, input, shape, name_scope, pool_size=None):
        with tf.name_scope(name_scope):
            weight = self._initialize_weight(shape, self.stddev_initialization)
            bias = self._initialize_bias([shape[-1]])
            conv = tf.nn.conv2d(input=input, filter=weight, strides=[1, 1, 1, 1], padding=self.padding, name='conv')
            activation = tf.nn.relu(tf.nn.bias_add(conv, bias), name="relu")
            if pool_size:
                return tf.nn.max_pool(value=activation, ksize=[1, 1, pool_size, 1], strides=[1, 1, pool_size, 1],
                                      padding=self.padding, name='maxpool')
            else:
                return activation

    def _create_fc(self, input, shape, name_scope, keep_prob=None):
        with tf.name_scope(name_scope):
            weight = self._initialize_weight(shape, self.stddev_initialization)
            bias = self._initialize_bias([shape[-1]])
            dense = tf.nn.bias_add(tf.matmul(input, weight), bias, name="dense")
            if keep_prob is not None:
                return tf.nn.dropout(dense, keep_prob, name="dropout")
            else:
                return dense

    def _initialize_weight(self, shape, stddev):
        return tf.Variable(tf.truncated_normal(shape=shape, stddev=stddev, dtype=tf.float32, name='weight'))

    def _initialize_bias(self, shape):
        return tf.Variable(tf.constant(0, shape=shape, dtype=tf.float32, name='bias'))

    def loss(self, logits, labels):
        return tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels=labels))

    def accuracy(self, logits, labels):
        return tf.reduce_mean(tf.cast(tf.equal(tf.argmax(logits, 1), tf.cast(labels, tf.int64)), dtype=tf.float32))

    def confusion_matrix(self, logits, labels):
        return tf.confusion_matrix(tf.cast(labels, tf.int64), tf.argmax(logits, 1), num_classes=self.num_classes)

# Training Function

## Train function

In [0]:
# Training and test files should be tab separated, with integer class in left-hand column,
# with first class index of 0
# (0, 1, 2, 3...)
def train(training_path='', test_path='', save_path='', alphabet="""abcdefghijklmnopqrstuvwxyz0123456789,;.!?:'\"/\\|_@#$%^&*~`+-=<>()[]{}""", 
          batch_size=32, feature='small', num_epochs=20, test_interval=1, plot_interval=20, 
          lr=1e-2, optimizer='adam', dropout=0.5, es_min_delta=0, es_patience=3,
          random_state=None):
    """
    alphabet, Valid characters used for model, e.g. "GATC" for DNA classification
    train_set, "data/train.csv", "Path to the training set"
    test_set, "data/test.csv", "Path to the test set"
    test_interval, 1, "Number of epochs between testing phases"
    plot_interval, 20, "Number of iterations between updating loss plot"
    feature, "small", "large or small"
    batch_size, 128, "Minibatch size"
    num_epochs, 20, "Number of training epochs"
    lr, 1e-2, "Learning rate"
    optimizer, "sgd", "sgd or adam"
    dropout, 0.5, "Dropout's probability"
    save_path, "trained_models", "path to store trained model"
    es_min_delta, 0., "Early stopping's parameter: minimum change loss to qualify as an improvement"
    es_patience", 3, "Early stopping's parameter: number of epochs with no improvement after which training will be stopped. Set to 0 to disable this technique")
    """
  
    # Don't change this unless you are sure it will work with network architecture
    max_length=1014 

    allow_soft_placement=True
    log_device_placement=False

    # Get number of classes
    num_classes = get_num_classes(training_path)
    print('Number of classes detected: %d' % (num_classes))
    model = Char_level_cnn(batch_size=batch_size, num_classes=num_classes, feature=feature)

    # Save loss history
    iteration_history = collections.defaultdict(list)
    epoch_history = collections.defaultdict(list)
    epoch_history['train_loss']
    epoch_history['test_loss']
    epoch_history['test_accuracy']

    # Initialize the operation graph
    with tf.Graph().as_default():
        session_conf = tf.ConfigProto(
            allow_soft_placement=allow_soft_placement,
            log_device_placement=log_device_placement)
        session_conf.gpu_options.allow_growth = True

        # Set graph random seed
        if random_state != None:
            tf.set_random_seed(random_state)

        # Generate dataset and dataset iterator
        training_set, num_training_iters = create_dataset(training_path, alphabet, max_length,
                                                          batch_size, True)
        test_set, num_test_iters = create_dataset(test_path, alphabet, max_length, batch_size, False)
        train_iterator = training_set.make_initializable_iterator()
        test_iterator = test_set.make_initializable_iterator()

        handle = tf.placeholder(tf.string, shape=[])
        keep_prob = tf.placeholder(tf.float32, name='dropout_prob')

        iterator = tf.data.Iterator.from_string_handle(handle, training_set.output_types, training_set.output_shapes)
        texts, labels = iterator.get_next()

        # Model functions
        logits = model.forward(texts, keep_prob)
        loss = model.loss(logits, labels)
        accuracy = model.accuracy(logits, labels)
        batch_size = tf.unstack(tf.shape(texts))[0]
        confusion = model.confusion_matrix(logits, labels)
        global_step = tf.Variable(0, name="global_step", trainable=False)

        if optimizer == "sgd":
            values = [lr]
            boundaries = []
            for i in range(1, 10):
                values.append(lr / pow(2, i))
                boundaries.append(3 * num_training_iters * i)
            learning_rate = tf.train.piecewise_constant(global_step, boundaries, values)
            optimizer = tf.train.MomentumOptimizer(learning_rate, momentum=0.9)
        else:
            optimizer = tf.train.AdamOptimizer(lr)

        train_op = optimizer.minimize(loss, global_step=global_step)
        init = tf.global_variables_initializer()
        saver = tf.train.Saver()

        # Save model parameters to disk
        if os.path.isdir(save_path):
            shutil.rmtree(save_path)
        os.makedirs(save_path)
        output_file = open(save_path + os.sep + "logs.txt", "w")
        training_parameters = {'alphabet': alphabet, 
          'batch_size': batch_size, 'feature': feature, 'num_epochs':num_epochs, 
          'learning_rate':lr, 'optimizer':optimizer, 'dropout':dropout, 
         'es_min_delta':es_min_delta, 'es_patience': es_patience,
          'random_state':random_state}
        output_file.write("Model's parameters: {}".format(training_parameters))

        # For recording best results
        best_loss = 1e5
        best_epoch = 0
        best_accuracy = 0
        best_confusion_matrix = []

        # For recording samples / sec
        epoch_throughput = 0
        epoch_duration = 0
        mean_test_accuracy = 0

        with tf.Session(config=session_conf) as sess:
            sess.run(init)
            for epoch in range(num_epochs):
                sess.run(train_iterator.initializer)
                sess.run(test_iterator.initializer)
                train_handle = sess.run(train_iterator.string_handle())
                test_handle = sess.run(test_iterator.string_handle())
                train_iter = 0

                # Record time
                start_time = time.time()

                # Keep track of training loss
                train_loss_ls = []
                num_samples = 0

                while True:
                    try:
                        # Perform an iteration
                        _, tr_loss, tr_accuracy, step, samples = sess.run(
                            [train_op, loss, accuracy, global_step, batch_size],
                            feed_dict={handle: train_handle, keep_prob: dropout})
                        
                        # Save loss for plotting
                        iteration_history['training_loss'].append(tr_loss)

                        # Plot loss history at regular intervals
                        if train_iter % plot_interval == 0:
                            live_plot_double(iteration_history, epoch_history)
                            print("Epoch: {}/{}, Epoch duration/sec: {}, Samples/sec: {}, Test accuracy: {}".format(epoch + 1, num_epochs,
                                                                                    epoch_duration, epoch_throughput, mean_test_accuracy))

                        # Record this epoch's losses so we can get mean later
                        num_samples += samples
                        train_loss_ls.append(tr_loss * samples)
              
                        train_iter += 1
                    except (tf.errors.OutOfRangeError, StopIteration):
                        break

                # Record duration of epoch
                epoch_duration = time.time() - start_time
                epoch_throughput = num_samples / epoch_duration
                
                # Plot train loss
                mean_train_loss = sum(train_loss_ls) / num_samples
                epoch_history['train_loss'].append(mean_train_loss)

                # Calculate the test loss
                if epoch % test_interval == 0:
                    loss_ls = []
                    accuracy_ls = []
                    confusion_matrix = np.zeros([num_classes, num_classes], np.int32)
                    num_samples = 0
                    while True:
                        try:
                            test_loss, test_accuracy, test_confusion, samples = sess.run(
                                [loss, accuracy, confusion, batch_size],
                                feed_dict={handle: test_handle, keep_prob: 1.0})
                            loss_ls.append(test_loss * samples)
                            accuracy_ls.append(test_accuracy * samples)
                            confusion_matrix += test_confusion
                            num_samples += samples
                        except (tf.errors.OutOfRangeError, StopIteration):
                            break

                    # Get test loss
                    mean_test_loss = sum(loss_ls) / num_samples

                    mean_test_accuracy = sum(accuracy_ls) / num_samples

                    output_file.write(
                        "Epoch: {}/{} \nTest loss: {} Test accuracy: {} \nTest confusion matrix: \n{}\n\n".format(
                            epoch + 1, num_epochs,
                            mean_test_loss,
                            mean_test_accuracy,
                            confusion_matrix))
                    
                    # Update loss plot
                    epoch_history['test_loss'].append(mean_test_loss)
                    epoch_history['test_accuracy'].append(mean_test_accuracy)
                    live_plot_double(iteration_history, epoch_history)
                    print("Epoch: {}/{}, Epoch duration/sec: {}, Samples/sec: {}, Test accuracy: {}".format(epoch + 1, num_epochs,
                                                                                    epoch_duration, epoch_throughput, mean_test_accuracy))

                    # Keep track of best test loss and save it to disk
                    if mean_test_loss + es_min_delta < best_loss:
                        best_loss = mean_test_loss
                        best_accuracy = mean_test_accuracy
                        best_confusion_matrix = confusion_matrix
                        best_epoch = epoch
                        saver.save(sess, save_path + os.sep + "char_level_cnn")
                    # Stop training if test loss is no longer decreasing
                    if epoch - best_epoch > es_patience > 0:
                        print("Stop training at epoch {}. The lowest loss achieved is {}. The corresponding accuracy achieved is {}.".format(epoch, best_loss, best_accuracy))
                        break
        
        # Print vital statistics
        print('')
        print("Final five test losses:")
        print('\n'.join('{}: {:.4f}'.format(*k) for k in enumerate(epoch_history['test_loss'][-5:])))
        print('')
        print("Best test loss: {:.4f}".format(best_loss))
        print("Corresponding test accuracy: {:.4f}".format(best_accuracy))
        print("Corresponding confusion matrix: \n{}\n\n".format(best_confusion_matrix))

        output_file.close()

# Model Training

In [0]:
# Example for classifying DNA sequences
train(training_path='/path/to/train.tsv', test_path='/path/to/test.tsv', 
      save_path='/path/to/savedir', feature='small', lr=1e-4, 
      batch_size=128, alphabet="GATC", random_state=42, num_epochs=20)