In [None]:
%pip install logomaker

Note: you may need to restart the kernel to use updated packages.


In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Model

import numpy as np
import requests as rq
import io, h5py

import pandas as pd
import matplotlib.pyplot as plt
import logomaker
%matplotlib inline

Matplotlib is building the font cache; this may take a moment.


In [None]:
data = rq.get('https://www.dropbox.com/s/c3umbo5y13sqcfp/synthetic_dataset.h5?raw=true')
data.raise_for_status()

with h5py.File(io.BytesIO(data.content), 'r') as dataset:
    x_train = np.array(dataset['X_train']).astype(np.float32).transpose([0, 2, 1])
    y_train = np.array(dataset['Y_train']).astype(np.float32)
    x_valid = np.array(dataset['X_valid']).astype(np.float32).transpose([0, 2, 1])
    y_valid = np.array(dataset['Y_valid']).astype(np.int32)
    x_test = np.array(dataset['X_test']).astype(np.float32).transpose([0, 2, 1])
    y_test = np.array(dataset['Y_test']).astype(np.int32)

print(x_train.shape, y_train.shape)

(21000, 200, 4) (21000, 12)


In [None]:
def generate_model(params):
  filters, kernel, BN1, pool, stride, heads, key_size, dense, BN2, pos, drop, layerNorm = params
  inputs = layers.Input(shape=(200, 4))

  nn = layers.Conv1D(filters=filters, kernel_size=kernel, padding='same', use_bias='false')(inputs)
  if BN1:
    nn = layers.BatchNormalization()(nn)
  nn = layers.Activation('relu')(nn)
  if pool != 0:
    nn = layers.MaxPool1D(pool_size=pool, strides=stride)(nn)
  nn = layers.Dropout(0.05)(nn)

  if pos:
    positions = tf.range(nn.shape[1])
    context = layers.Embedding(input_dim=nn.shape[1], output_dim=nn.shape[2])(positions)

    contextual_meaning = tf.add(nn, context)
  else:
    contextual_meaning = nn

  attention, weights = layers.MultiHeadAttention(num_heads=heads, key_dim=key_size, dropout=0)(contextual_meaning, contextual_meaning, return_attention_scores=True)
  if drop:
    nn = layers.Dropout(0.1)(attention)
  if layerNorm:
    nn = layers.LayerNorm()(nn)

  nn = layers.Flatten()(attention)
  nn = layers.Dense(dense, use_bias=False)(nn)
  if BN2:
    nn = layers.BatchNormalization()(nn)
  nn = layers.Activation('relu')(nn)
  nn = layers.Dropout(0.5)(nn)

  output = layers.Dense(12, activation='sigmoid')(nn)

  model = Model(inputs=inputs, outputs=output)
  model.compile(optimizer=tf.keras.optimizers.Adam(0.0001), loss='binary_crossentropy', metrics=[tf.keras.metrics.AUC(curve='ROC', name='auroc'), tf.keras.metrics.AUC(curve='PR', name='aupr')])
  return model

In [None]:
base = [64, 19, True, 4, 4, 8, 32, 512, True, False, True, True]

filters = [4, 8, 16, 32, 64, 96, 128]
kernels = [19]
BN1 = [True, False]
pools = [0, 2, 4, 6, 10, 20, 25, 50, 100]
strides= [1, 2, 4, 6, 10]
heads = [1, 2, 4, 8, 12, 20]
key_sizes = [32, 64, 128, 256]
denses = [32, 64, 128, 256, 512]
BN2 = [True, False]

model = generate_model(base)

early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, verbose=1, mode='min', restore_best_weights=False)
lr_decay = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-10, mode='min', verbose=1) 
model.fit(x_train, y_train, epochs=2, validation_data=(x_valid, y_valid), callbacks=[early_stop, lr_decay])

model.evaluate(x_test, y_test)

In [None]:
layer = 3        # activation layer for 1st convolutional layer
threshold = 0.5  # threshold for significant activations
window = 20      # window size of alignment 

# get feature maps of 1st convolutional layer after activation
intermediate = tf.keras.Model(inputs=model.inputs, outputs=model.layers[layer].output)
fmap = intermediate.predict(x_test)
num_filters = fmap.shape[-1]

# set the left and right window sizes
window_left = int(window/2)
window_right = window - window_left

N, L, A = x_test.shape

W = []
for filter_index in range(num_filters):

    # find regions above threshold
    coords = np.where(fmap[:,:,filter_index] > np.max(fmap[:,:,filter_index])*threshold)
    x, y = coords

    # sort score
    index = np.argsort(fmap[x,y,filter_index])[::-1]
    data_index = x[index].astype(int)
    pos_index = y[index].astype(int)

    # make a sequence alignment centered about each activation (above threshold)
    seq_align = []
    for i in range(len(pos_index)):

        # determine position of window about each filter activation
        start_window = pos_index[i] - window_left
        end_window = pos_index[i] + window_right

        # check to make sure positions are valid
        if (start_window > 0) & (end_window < L):
            seq = x_test[data_index[i], start_window:end_window, :]
            seq_align.append(seq)

    # calculate position probability matrix
    W.append(np.mean(seq_align, axis=0))
W = np.array(W)


In [None]:
fig = plt.figure(figsize=(30,5))
fig.subplots_adjust(hspace=0.1, wspace=0.1)

num_cols = 8
num_widths = int(np.ceil(num_filters/num_cols))
for n, w in enumerate(W):
    ax = fig.add_subplot(num_widths, num_cols, n+1)
    
    if not np.isnan(w).any():
        # calculate sequence logo heights -- information
        I = np.log2(4) + np.sum(w * np.log2(w+1e-7), axis=1, keepdims=True)
        logo = I*w

        # create dataframe for logomaker
        filter_len = w.shape[0]
        counts_df = pd.DataFrame(data=0.0, columns=list('ACGT'), index=list(range(filter_len)))
        for a in range(A):
            for l in range(filter_len):
                counts_df.iloc[l,a] = logo[l,a]

        # plot filter representation
        logomaker.Logo(counts_df, ax=ax)
        ax = plt.gca()
        ax.spines['right'].set_visible(False)
        ax.spines['top'].set_visible(False)
        ax.yaxis.set_ticks_position('none')
        ax.xaxis.set_ticks_position('none')
        plt.xticks([])
        plt.yticks([])