<a href="https://colab.research.google.com/github/simecek/ECCB2021/blob/main/notebooks/10_Integrated_Gradients_G4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Data

In [1]:
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv1D, BatchNormalization, MaxPooling1D, Dropout, GlobalAveragePooling1D, Dense

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display, HTML

In [2]:
# get train dataset
!wget --quiet https://raw.githubusercontent.com/ML-Bioinfo-CEITEC/penguinn/master/Datasets/train_set_1_1.txt

nucleo_dic = {
    "A": 0,
    "C": 1,
    "T": 2,
    "G": 3,
    "N": 4,
}

df_train = pd.read_csv("train_set_1_1.txt", sep='\t', names=['sequence', 'label'])

# translate text labels to numbers 0, 1
labels_train = np.array(list(map((lambda x: 1 if x == 'positive' else 0), list(df_train['label']))))
dataset_train = df_train['sequence'].tolist()
# numericalize using the dictionary
dataset_ordinal_train = [[nucleo_dic[letter] for letter in sequence] for sequence in dataset_train]
# translate number values to one-hot vectors
dataset_onehot_train = tf.one_hot(dataset_ordinal_train, depth=5)

In [3]:
# get test dataset
!wget --quiet https://raw.githubusercontent.com/ML-Bioinfo-CEITEC/penguinn/master/Datasets/test_set_1_1.txt

# preprocess the test set similarly
df_test = pd.read_csv("test_set_1_1.txt", sep='\t', names=['sequence', 'label'])

labels_test = np.array(list(map((lambda x: 1 if x == 'positive' else 0), list(df_test['label']))))
dataset_test = df_test['sequence'].tolist()

# we use the same nucleo_dic as on the example before
dataset_ordinal_test = [[nucleo_dic[letter] for letter in sequence] for sequence in dataset_test]
dataset_onehot_test = tf.one_hot(dataset_ordinal_test, depth=5)

## Model

We have adapted model from our original [paper](https://www.frontiersin.org/articles/10.3389/fgene.2020.568546/full). Note it is sligtly more complex model than what we have seen yesterday.

In [4]:
model = Sequential([
        Conv1D(32, kernel_size=8, data_format='channels_last', activation='relu'),
        BatchNormalization(),
        MaxPooling1D(),
        Conv1D(16, kernel_size=8, data_format='channels_last', activation='relu'),
        BatchNormalization(),
        MaxPooling1D(),
        Conv1D(4, kernel_size=8, data_format='channels_last', activation='relu'),
        BatchNormalization(),
        MaxPooling1D(),
        Dropout(0.3),
        GlobalAveragePooling1D(),
        Dense(1)])

In [7]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(0.005),
    loss=tf.keras.losses.binary_crossentropy(from_logits=True),
    metrics=['accuracy']
)

## Training and saving the model

In [8]:
model.fit(
    dataset_onehot_train,
    labels_train,
    batch_size=128,
    epochs=5,
    validation_split=0.3
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<tensorflow.python.keras.callbacks.History at 0x7f235018dcd0>

In [9]:
model.save("cnn_3epochs.h5", save_format='h5')

In [None]:
model = tf.keras.models.load_model('cnn_3epochs.h5')

## Integrated Gradients

In [12]:
def generate_alphas(m_steps=50, method='riemann_trapezoidal'):
    """
    Args:
    m_steps(Tensor): A 0D tensor of an int corresponding to the number of linear
    interpolation steps for computing an approximate integral. Default is 50.
    method(str): A string representing the integral approximation method. The
       following methods are implemented:
      - riemann_trapezoidal(default)
      - riemann_left
      - riemann_midpoint
      - riemann_right
    Returns:
      alphas(Tensor): A 1D tensor of uniformly spaced floats with the shape
      (m_steps,).
      """
    m_steps_float = tf.cast(m_steps, float)

    if method == 'riemann_trapezoidal':
        alphas = tf.linspace(0.0, 1.0, m_steps+1)
    elif method == 'riemann_left':
        alphas = tf.linspace(0.0, 1.0 - (1.0 / m_steps_float), m_steps)
    elif method == 'riemann_midpoint':
        alphas = tf.linspace(1.0 / (2.0 * m_steps_float), 1.0 - 1.0 / (2.0 * m_steps_float), m_steps)
    elif method == 'riemann_right':
        alphas = tf.linspace(1.0 / m_steps_float, 1.0, m_steps)
    else:
        raise AssertionError("Provided Riemann approximation method is not valid.")

    return alphas

def generate_path_inputs(baseline, input, alphas):
    """
    Generate interpolated 'images' along a linear path at alpha intervals between a baseline tensor

    baseline: 2D, shape: (200, 4)
    input: preprocessed sample, shape: (200, 4)
    alphas: list of steps in interpolated image ,shape: (21)


    return: shape (21, 200, 4)
    """
    # Expand dimensions for vectorized computation of interpolations.
    alphas_x = alphas[:, tf.newaxis, tf.newaxis]
    baseline_x = tf.expand_dims(baseline, axis=0)
    input_x = tf.expand_dims(input, axis=0)
    delta = input_x - baseline_x
    path_inputs = baseline_x + alphas_x * delta

    return path_inputs

def compute_gradients(model, path_inputs):
    """
    compute dependency of each field on whole result, compared to interpolated 'images'

    :param model: trained model
    :param path_inputs: interpolated tensors, shape: (21, 200, 4)
    :return: shape: (21, 200, 4)
    """
    with tf.GradientTape() as tape:
        tape.watch(path_inputs)
        predictions = model(path_inputs)

        outputs = []
        for envelope in predictions:
            outputs.append(envelope[0])
        outputs = tf.convert_to_tensor(outputs, dtype=tf.float32)

    gradients = tape.gradient(outputs, path_inputs)
    return gradients

def integral_approximation(gradients, method='riemann_trapezoidal'):
    """Compute numerical approximation of integral from gradients.
    Args:
    gradients(Tensor): A 4D tensor of floats with the shape
    (m_steps, img_height, img_width, 3).
    method(str): A string representing the integral approximation method. The
    following methods are implemented:
    - riemann_trapezoidal(default)
    - riemann_left
    - riemann_midpoint
    - riemann_right
    Returns:
    integrated_gradients(Tensor): A 3D tensor of floats with the shape
    (img_height, img_width, 3).
    """
    if method == 'riemann_trapezoidal':
        grads = (gradients[:-1] + gradients[1:]) / tf.constant(2.0)
    elif method == 'riemann_left':
        grads = gradients
    elif method == 'riemann_midpoint':
        grads = gradients
    elif method == 'riemann_right':
        grads = gradients
    else:
        raise AssertionError("Provided Riemann approximation method is not valid.")

    # Average integration approximation.
    integrated_gradients = tf.math.reduce_mean(grads, axis=0)

    return integrated_gradients

def integrated_gradients(model, baseline, input, m_steps=50, method='riemann_trapezoidal',
                         batch_size=32):
    """
    Args:
      model(keras.Model): A trained model to generate predictions and inspect.
      baseline(Tensor): 2D, shape: (200, 4)
      input(Tensor): preprocessed sample, shape: (200, 4)
      m_steps(Tensor): A 0D tensor of an integer corresponding to the number of
        linear interpolation steps for computing an approximate integral.
      method(str): A string representing the integral approximation method. The
        following methods are implemented:
        - riemann_trapezoidal(default)
        - riemann_left
        - riemann_midpoint
        - riemann_right
      batch_size(Tensor): A 0D tensor of an integer corresponding to a batch
        size for alpha to scale computation and prevent OOM errors. Note: needs to
        be tf.int64 and shoud be < m_steps. Default value is 32.
    Returns:
      integrated_gradients(Tensor): A 2D tensor of floats with the same
        shape as the input tensor.
    """

    # 1. Generate alphas.
    alphas = generate_alphas(m_steps=m_steps,
                             method=method)

    # Initialize TensorArray outside loop to collect gradients. Note: this data structure
    gradient_batches = tf.TensorArray(tf.float32, size=m_steps + 1)

    # Iterate alphas range and batch computation for speed, memory efficiency, and scaling to larger m_steps.
    for alpha in tf.range(0, len(alphas), batch_size):
        from_ = alpha
        to = tf.minimum(from_ + batch_size, len(alphas))
        alpha_batch = alphas[from_:to]

        # 2. Generate interpolated inputs between baseline and input.
        interpolated_path_input_batch = generate_path_inputs(baseline=baseline,
                                                             input=input,
                                                             alphas=alpha_batch)

        # 3. Compute gradients between model outputs and interpolated inputs.
        gradient_batch = compute_gradients(model=model,
                                           path_inputs=interpolated_path_input_batch)

        # Write batch indices and gradients to TensorArray.
        gradient_batches = gradient_batches.scatter(tf.range(from_, to), gradient_batch)

    # Stack path gradients together row-wise into single tensor.
    total_gradients = gradient_batches.stack()

    # 4. Integral approximation through averaging gradients.
    avg_gradients = integral_approximation(gradients=total_gradients,
                                           method=method)

    # 5. Scale integrated gradients with respect to input.
    integrated_gradients = (input - baseline) * avg_gradients

    return integrated_gradients

def choose_validation_points(integrated_gradients):
    """
    Args:
          integrated_gradients(Tensor): A 2D tensor of floats with shape (200, 4).
    Return: List of attributes for highlighting DNA string sequence
    """
    attr = np.zeros(200)
    for i in range(200):
        for j in range(4):
            if integrated_gradients[i][j].numpy() == 0:
                continue
            attr[i] = integrated_gradients[i][j].numpy()
    return attr

def visualize_token_attrs(sequence, attrs):
    """
    Visualize attributions for given set of tokens.
    Args:
    - tokens: An array of tokens
    - attrs: An array of attributions, of same size as 'tokens',
      with attrs[i] being the attribution to tokens[i]

    Returns:
    - visualization: HTML text with colorful representation of DNA sequence
        build on model prediction
    """

    def get_color(attr):
        if attr > 0:
            red = int(128 * attr) + 127
            green = 128 - int(64 * attr)
            blue = 128 - int(64 * attr)
        else:
            red = 128 + int(64 * attr)
            green = 128 + int(64 * attr)
            blue = int(-128 * attr) + 127

        return red, green, blue

    # normalize attributions for visualization.
    bound = max(abs(max(attrs)), abs(min(attrs)))
    attrs = attrs / bound
    html_text = ""
    for i, tok in enumerate(sequence):
        r, g, b = get_color(attrs[i])
        if abs(attrs[i]) > 0.5:
          html_text += " <span style='color:rgb(%d,%d,%d);font-weight:bold'>%s</span>" % (r, g, b, tok)
        else: 
          html_text += " <span style='color:rgb(%d,%d,%d)'>%s</span>" % (r, g, b, tok)

    return html_text

In [13]:
seq = 'AAAGAAGAGACCAAGACGGAAGACCCAATCGGACCGGGAGGTCCGGGGAGACGTGTCGGGGATCGGGACTTGACTGTGCTTACCAAAGGACCTAACGGAGGGGTCCATAGGAGTCTTGCGGGACTCCCTGGCACTGGAGTAGTATCGACATAAGGGTCACGGACGTTCCATTTAGTGAGCCATTTATAAACCACTATCAA'

channel={
            'A': 0,
            'T': 1,
            'C': 2,
            'G': 3,
            'N': 4
        }
seq_onehot = tf.one_hot([channel[c] for c in seq], depth=5)

#seq_onehot = tf.convert_to_tensor(seq_onehot, dtype=tf.float32)[:,:4]
seq_onehot.shape

TensorShape([200, 5])

In [14]:
baseline = tf.zeros(shape=(200, 5))

ig_attribution = integrated_gradients(model, baseline, seq_onehot)
attrs = choose_validation_points(ig_attribution)

visualisation = visualize_token_attrs(seq, attrs)
HTML(visualisation)