In [None]:
!pip install transformers keras-self-attention attention

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from google.colab import drive
from tensorflow.keras.layers import LSTM, Bidirectional, GlobalMaxPool1D, Dense, Input, Dropout, Concatenate, TimeDistributed
from transformers import DistilBertTokenizerFast, DistilBertModel, DistilBertConfig, TFDistilBertModel
from tensorflow.keras import Model
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras import callbacks
from keras_self_attention import SeqSelfAttention
from tensorflow import keras
from attention import Attention
import sklearn.metrics

In [None]:
# https://www.kaggle.com/jorgemf/rnn-gru-bidirectional-attentional-context

from keras import backend as K
from keras.engine.topology import Layer
from keras import initializers, regularizers, constraints
import numpy as np

def dot_product(x, kernel):
    """
    Wrapper for dot product operation, in order to be compatible with both
    Theano and Tensorflow
    Args:
        x (): input
        kernel (): weights
    Returns:
    """
    if K.backend() == 'tensorflow':
        return K.squeeze(K.dot(x, K.expand_dims(kernel)), axis=-1)
    else:
        return K.dot(x, kernel)
    
class AttentionWithContext(Layer):
    """
    Attention operation, with a context/query vector, for temporal data.
    Supports Masking.
    Follows the work of Yang et al. [https://www.cs.cmu.edu/~diyiy/docs/naacl16.pdf]
    "Hierarchical Attention Networks for Document Classification"
    by using a context vector to assist the attention
    # Input shape
        3D tensor with shape: `(samples, steps, features)`.
    # Output shape
        2D tensor with shape: `(samples, features)`.
    How to use:
    Just put it on top of an RNN Layer (GRU/LSTM/SimpleRNN) with return_sequences=True.
    The dimensions are inferred based on the output shape of the RNN.
    Note: The layer has been tested with Keras 2.0.6
    Example:
        model.add(LSTM(64, return_sequences=True))
        model.add(AttentionWithContext())
        # next add a Dense layer (for classification/regression) or whatever...
    """

    def __init__(self,
                 W_regularizer=None, u_regularizer=None, b_regularizer=None,
                 W_constraint=None, u_constraint=None, b_constraint=None,
                 bias=True, **kwargs):

        self.supports_masking = True
        self.init = initializers.get('glorot_uniform')

        self.W_regularizer = regularizers.get(W_regularizer)
        self.u_regularizer = regularizers.get(u_regularizer)
        self.b_regularizer = regularizers.get(b_regularizer)

        self.W_constraint = constraints.get(W_constraint)
        self.u_constraint = constraints.get(u_constraint)
        self.b_constraint = constraints.get(b_constraint)

        self.bias = bias
        super(AttentionWithContext, self).__init__(**kwargs)

    def build(self, input_shape):
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[-1], input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_W'.format(self.name),
                                 regularizer=self.W_regularizer,
                                 constraint=self.W_constraint)
        if self.bias:
            self.b = self.add_weight(shape=(input_shape[-1],),
                                     initializer='zero',
                                     name='{}_b'.format(self.name),
                                     regularizer=self.b_regularizer,
                                     constraint=self.b_constraint)

        self.u = self.add_weight(shape=(input_shape[-1],),
                                 initializer=self.init,
                                 name='{}_u'.format(self.name),
                                 regularizer=self.u_regularizer,
                                 constraint=self.u_constraint)

        super(AttentionWithContext, self).build(input_shape)

    def compute_mask(self, input, input_mask=None):
        # do not pass the mask to the next layers
        return None

    def call(self, x, mask=None):
        uit = dot_product(x, self.W)

        if self.bias:
            uit += self.b

        uit = K.tanh(uit)
        ait = dot_product(uit, self.u)

        a = K.exp(ait)

        # apply mask after the exp. will be re-normalized next
        if mask is not None:
            # Cast the mask to floatX to avoid float64 upcasting in theano
            a *= K.cast(mask, K.floatx())

        # in some cases especially in the early stages of training the sum may be almost zero
        # and this results in NaN's. A workaround is to add a very small positive number ε to the sum.
        # a /= K.cast(K.sum(a, axis=1, keepdims=True), K.floatx())
        a /= K.cast(K.sum(a, axis=1, keepdims=True) + K.epsilon(), K.floatx())

        a = K.expand_dims(a)
        weighted_input = x * a
        return K.sum(weighted_input, axis=1)

    def compute_output_shape(self, input_shape):
        return input_shape[0], input_shape[-1]
    
    def get_config(self):
      config = super().get_config().copy()
      config.update({
              'W_regularizer': self.W_regularizer,
              'u_regularizer': self.u_regularizer,
              'b_regularizer': self.b_regularizer,
              'W_constraint': self.W_constraint,
              'u_constraint': self.u_constraint,
              'b_constraint': self.b_constraint,
              'bias': self.bias,
      })
      return config

In [None]:
# set main directories
BASE_DIR = '/content/gdrive/MyDrive/AI_BERT_PATTERN_CLASSIFICATION/'
drive.mount('/content/gdrive', force_remount=True)

In [None]:
df = pd.read_csv(BASE_DIR + 'data/patterns.csv')

In [None]:
def preprocess(doc):
  doc = doc.lower().replace('?', ' ?')
  return doc

In [None]:
df['questions_cleaned'] = df['corrected_question'].apply(preprocess)

In [None]:
df.head(5)

In [None]:
# check max len
max_len = max(list(map(lambda i: len(i.split()), df['questions_cleaned'].values)))
print(max_len)

In [None]:
tohot= df['patterns']
tohot

In [None]:
onehot = np.zeros((tohot.shape[0],12))

In [None]:
onehot.shape

In [None]:
categories = [f"p{x}" for x in range(11)] + ['p_notFound']
#categories = [f"p{x}" for x in range(1,5)]
print(categories)

In [None]:
for i in range(0,onehot.shape[0]):
  p = tohot[i]
  argp = categories.index(p)
  onehot[i, argp] = 1

In [None]:
## one hot encode target variables
#encoder = OneHotEncoder(categories=categories, sparse=False)
## transform data
#onehot = encoder.fit_transform(s)

In [None]:
onehot

## Tokenization

In [None]:
def bertTokenize(docs, tokenizer):
  input_ids = []
  attention_masks = []
  for doc in docs:
    bert_inp = tokenizer.encode_plus(doc, add_special_tokens = True,  max_length = max_len, padding = 'max_length', return_attention_mask = True, truncation=True)
    input_ids.append(bert_inp['input_ids'])
    attention_masks.append(bert_inp['attention_mask'])
  return np.array(input_ids, dtype='int32'), np.array(attention_masks, dtype='int32')

In [None]:
bert_tokenizer = DistilBertTokenizerFast.from_pretrained("distilbert-base-uncased")

In [None]:
input_ids, attention_masks = bertTokenize(df['corrected_question'].values, bert_tokenizer)

## Split in train and validation

In [None]:
ids_train, ids_val, att_train, att_val, y_train, y_val = train_test_split(input_ids, attention_masks, onehot, test_size=0.1, random_state=42)

## Build model

In [None]:
config = DistilBertConfig(dropout=0.2, attention_dropout=0.2, output_hidden_states = False)

In [None]:
bert_model = TFDistilBertModel.from_pretrained('distilbert-base-uncased', config=config)

In [None]:
# define inputs
input_ids_in = Input(shape=(max_len,), name='input_token', dtype='int32')
input_masks_in = Input(shape=(max_len,), name='masked_token', dtype='int32')

# define embedding layer with Bert weights
embedding_layer = bert_model.distilbert([input_ids_in, input_masks_in])[0]

# define lstm layer
lstm_layer = Bidirectional(LSTM(64, return_sequences=True, dropout=0.5))(embedding_layer)

att = AttentionWithContext()(lstm_layer)

# define output layer
output = Dense(onehot.shape[1], activation='softmax')(att)

model = Model(inputs=[input_ids_in, input_masks_in], outputs = output)

for layer in model.layers:
  if layer.name == 'distilbert':
    layer.trainable = False

In [None]:
model.summary()

In [None]:
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [None]:
y_train.shape

In [None]:
y_val.shape

In [None]:
history = model.fit([ids_train,att_train], y_train, batch_size=64, epochs=20, 
                    callbacks=callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True),
                    validation_data=([ids_val,att_val], y_val))

In [None]:
pd.DataFrame(history.history).plot(y=['loss', 'val_loss'])

In [None]:
#model.save(BASE_DIR + "pattern_classifier.h5")
#del model

In [None]:
model = tf.keras.models.load_model(BASE_DIR + "pattern_classifier.h5", custom_objects={'AttentionWithContext': AttentionWithContext}) 

In [None]:
df_test = pd.read_csv(BASE_DIR + 'data/patterns_test.csv')

In [None]:
df_test['questions_cleaned'] = df_test['corrected_question'].apply(preprocess)

In [None]:
tohot_test= df_test['patterns']
tohot_test

In [None]:
onehot_test = np.zeros((tohot_test.shape[0],12))
onehot_test

In [None]:
for i in range(0,onehot_test.shape[0]):
  p = tohot_test[i]
  argp = categories.index(p)
  onehot_test[i, argp] = 1

In [None]:
input_ids_test, attention_masks_test = bertTokenize(df_test['corrected_question'].values, bert_tokenizer)

In [None]:
evaluation = model.evaluate([input_ids_test, attention_masks_test], onehot_test, return_dict=True)
evaluation

In [None]:
y_test_pred = model.predict([input_ids_test, attention_masks_test])
y_test_pred

In [None]:
tohot_test_pred = list(map(lambda x: categories[x.argmax()], y_test_pred))

In [None]:
pd.DataFrame(tohot_test_pred).value_counts()

In [None]:
pd.DataFrame(sklearn.metrics.confusion_matrix(tohot_test, tohot_test_pred))