<a href="https://colab.research.google.com/github/supertime1/BP_PPG/blob/master/Attention_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from IPython.display import display
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
%load_ext tensorboard
import numpy as np
import os
import shutil
import glob
import wfdb
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import Model
from tensorflow.keras.models import load_model 
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from tensorflow.keras.layers import Conv1D, BatchNormalization, Input, Add, Activation,\
MaxPooling1D,Dropout,Flatten,TimeDistributed,Bidirectional,Dense,LSTM, ZeroPadding1D, \
AveragePooling1D,GlobalMaxPooling1D, Concatenate, Permute, Dot, Multiply, RepeatVector,\
Lambda
from tensorflow.keras.initializers import glorot_uniform
import tensorflow_datasets as tfds
import multiprocessing
from datetime import datetime
import sklearn.metrics
import itertools
import io
import pickle
print(tf.__version__)

2.2.0


##ResNet-18

In [0]:
def identity_block_18(X, f, filters, stage, block):

    # defining name basis
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # Retrieve Filters
    F1, F2 = filters
    
    # Save the input value. You'll need this later to add back to the main path. 
    X_shortcut = X
    
    # First component of main path
    X = Conv1D(filters = F1, kernel_size = f, strides = 1, padding = 'same', name = conv_name_base + '2a', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 2, name = bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    
    # Second component of main path 
    X = Conv1D(filters = F2, kernel_size = f, strides = 1, padding = 'same', name = conv_name_base + '2b', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 2, name = bn_name_base + '2b')(X)
    X = Activation('relu')(X)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation 
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)
    
    
    return X

In [0]:
def ResNet18(input_shape=(750, 1), classes=1):

    # Define the input as a tensor with shape input_shape
    X_input = Input(input_shape)

    # Zero-Padding
    X = ZeroPadding1D(3)(X_input)

    # Stage 1
    X = Conv1D(64, 7, strides=2, name='conv1', kernel_initializer=glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis=2, name='bn_conv1')(X)
    X = Activation('relu')(X)
    X = MaxPooling1D(3, strides=2)(X)

    # Stage 2
    X = identity_block_18(X, 3, [64, 64], stage=2, block='a')
    X = identity_block_18(X, 3, [64, 64], stage=2, block='b')


    # Stage 3 (2 lines)
    X = convolutional_block_18(X, f = 3, filters = [128, 128], stage = 3, block='a', s = 2)
    X = identity_block_18(X, 3, [128, 128], stage=3, block='b')


    # Stage 4 (2 lines)
    X = convolutional_block_18(X, f = 3, filters = [256, 256], stage = 4, block='a', s = 2)
    X = identity_block_18(X, 3, [256, 256], stage=4, block='b')

    # Stage 5 (2 lines)
    X = convolutional_block_18(X, f = 3, filters = [512, 512], stage = 5, block='a', s = 2)
    X = identity_block_18(X, 3, [512, 512], stage=5, block='b')


    # AVGPOOL (1 line).
    X = AveragePooling1D(2, name="avg_pool")(X)

    # output layer
    X = Flatten()(X)
    
    
    #X = Dense(classes, activation='sigmoid', name='fc' + str(classes), kernel_initializer = glorot_uniform(seed=0))(X)
    
    
    #Create model
    model = Model(inputs = X_input, outputs = X, name='ResNet18')

    return model

In [0]:
def convolutional_block_18(X, f, filters, stage, block, s = 2):
    
    # defining name basis
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'
    
    # Retrieve Filters
    F1, F2 = filters
    
    # Save the input value
    X_shortcut = X


    ##### MAIN PATH #####
    # First component of main path 
    X = Conv1D(filters = F1, kernel_size = f, strides = s, padding = 'valid', name = conv_name_base + '2a', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 2, name = bn_name_base + '2a')(X)
    X = Activation('relu')(X)

    # Second component of main path (≈3 lines)
    X = Conv1D(filters = F2, kernel_size = f, strides = 1, padding = 'same', name = conv_name_base + '2b', kernel_initializer = glorot_uniform(seed=0))(X)
    X = BatchNormalization(axis = 2, name = bn_name_base + '2b')(X)
    X = Activation('relu')(X)


    ##### SHORTCUT PATH #### (≈2 lines)
    X_shortcut = Conv1D(filters = F1, kernel_size = f, strides = s, padding = 'valid', name = conv_name_base + '1',
                        kernel_initializer = glorot_uniform(seed=0))(X_shortcut)
    X_shortcut = BatchNormalization(axis = 2, name = bn_name_base + '1')(X_shortcut)

    # Final step: Add shortcut value to main path, and pass it through a RELU activation (≈2 lines)
    X = Add()([X, X_shortcut])
    X = Activation('relu')(X)
    
    
    return X

##Attention Model

In [0]:
def one_step_attention(a, s_prev): 
  """
  Performs one step of attention: Outputs a context vector computed as a dot product of the attention weights
  "alphas" and the hidden states "a" of the Bi-LSTM.
  
  Arguments:
  a -- hidden state output of the Bi-LSTM, numpy-array of shape (m, Tx, 2*n_a)
  s_prev -- previous hidden state of the (post-attention) LSTM, numpy-array of shape (m, n_s)
  
  Returns:
  context -- context vector, input of the next (post-attention) LSTM cell
  """
  s_prev = RepeatVector(Tx)(s_prev)
  concat = Concatenate(axis=-1)([a, s_prev])
  e = Dense(10, activation = "tanh")(concat)
  energies = Dense(1, activation = "relu")(e)
  alphas = Activation(tf.nn.softmax(energies,axis=1), name = 'attention_weights')
  context = Dot(axes = 1)([alphas,a])
  
  return context

In [0]:
def Attention(Tx, Ty, n_a, n_s, input_image_size, output_size):
  """
  Arguments:
  Tx -- length of the input sequence = 10
  Ty -- length of the output sequence = 10, similar to one-to-one LSTMs
  n_a -- hidden state size of the Bi-LSTM = 32
  n_s -- hidden state size of the post-attention LSTM = 16
  input_image_size -- size of the input image = 750
  output_size -- size of output = 2, since there are two BP values

  Returns:
  model -- Keras model instance
  """
  # Define the inputs of your model with a shape (Tx,)
  # Define s0 and c0, initial hidden state for the decoder(post) LSTM of shape (n_s,)
  X = Input(shape = (Tx, input_image_size))
  s0 = Input(shape = (n_s, ), name = 's0')
  c0 = Input(shape = (n_s, ), name = 'c0')
  s = s0
  c = c0

  #Initialize empty list of outputs
  outputs = []

  a = Bidirectional(LSTM(n_a, return_sequences=True))(X)

  for t in range(Ty):
    context = one_step_attention(a, s)
    s, _, c = LSTM(n_s, return_state = True)(context, initial_state = [s, c])
    out = Dense(output_size)(s)
    outputs.append(out)
  
  outputs = Dense(2)(outputs) 
  model = Model(inputs = [X, s0, c0], outputs = outputs)

  return attention

##ResNet-18 + Attention

In [0]:
Tx=10 #Number of input images
Ty=10 #Number of post-LSTM cells
n_a=32 #Number of pre-LSTM states
n_s=16 #Number of post-LSTM states
input_image_size=750 #Input image size
output_size=2 #output of each post-LSTM cells before applying the FINAL dense layer

In [24]:
resnet = ResNet18(input_shape = (input_image_size,1), classes = 1)
attention = Attention(Tx, Ty, n_a, n_s, input_image_size, output_size)

TypeError: ignored

In [0]:
model = tf.keras.Sequential([
        tf.keras.layers.TimeDistributed(resnet, input_shape = (Tx, input_image_size,1)),
        attention
])