In [None]:
import pandas as pd

In [None]:
eval_dataset = pd.read_csv("evaluation.csv")

In [3]:
import tensorflow as tf
import tensorflow.keras.backend as K
import tensorflow.keras.layers as tfl
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Input

try:
    import tensorflow_hub as hub
except ModuleNotFoundError:
    %pip install tensorflow_hub
    import tensorflow_hub as hub
    
try:
    from keras_self_attention import SeqSelfAttention
except ModuleNotFoundError:
    %pip install keras-self-attention
    from keras_self_attention import SeqSelfAttention

import numpy as np


# %pip install -q transformers
# %pip install -q -U tensorflow-text
# %pip install -q tf-models-official==2.7.0

import math as m

^C
Note: you may need to restart the kernel to use updated packages.


ModuleNotFoundError: No module named 'keras_self_attention'

In [None]:
huburl = "https://tfhub.dev/google/universal-sentence-encoder/4" 
fine_tuned_module_object = hub.load(huburl)

In [5]:
def gen_random_batch(in_groups, batch_halfsize = 8):
    text_batch, reason_batch, y_hat = [], [], []
    all_groups = list(range(in_groups[0].shape[0]))
    for match_group in [True, False]:
        group_idx = np.random.choice(all_groups, size = batch_halfsize)
        text_batch += [in_groups[0][c_idx] for c_idx in group_idx]
        if match_group:
            b_group_idx = group_idx
            y_hat += [1]*batch_halfsize
        else:
            # anything but the same group
            non_group_idx = [np.random.choice([i for i in all_groups if i!=c_idx]) for c_idx in group_idx] 
            b_group_idx = non_group_idx
            y_hat += [0]*batch_halfsize
            
        reason_batch += [in_groups[1][c_idx] for c_idx in b_group_idx]
            
    return np.stack(text_batch, 0), np.stack(reason_batch, 0), np.stack(y_hat, 0)

def siam_gen(in_groups, batch_size = 32):
    while True:
        text_stack, reason_stack, y_hatstack = gen_random_batch(in_groups, batch_size//2)
        yield [text_stack, reason_stack], y_hatstack

def euclidean_distance(vectors):
    (featsA, featsB) = vectors
    sumSquared = K.sum(K.square(featsA - featsB), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sumSquared, K.epsilon()))

def contrastive_loss(y, preds, margin=1):
    # explicitly cast the true class label data type to the predicted
    # class label data type (otherwise we run the risk of having two
    # separate data types, causing TensorFlow to error out)
    y = tf.cast(y, preds.dtype)
    # calculate the contrastive loss between the true labels and
    # the predicted labels
    squaredPreds = K.square(preds)
    squaredMargin = K.square(K.maximum(margin - preds, 0))
    loss = K.mean(y * squaredPreds + (1 - y) * squaredMargin)
    return loss

In [6]:
def accuracy(y_true, y_pred):
    results = (y_pred <= 0.5).astype(int).squeeze()
    return np.mean([y_true == results])

In [4]:
def get_model():
    shared_embedding_layer1 = hub.KerasLayer(fine_tuned_module_object, trainable=True, name="first")
    shared_embedding_layer2 = hub.KerasLayer(fine_tuned_module_object, trainable=True, name="second")

    left_input = tf.keras.Input(shape=(), dtype=tf.string)
    right_input = tf.keras.Input(shape=(), dtype=tf.string)

    embedding_left_output= shared_embedding_layer1(left_input)
    a = tfl.Dropout(0.3)(embedding_left_output)
    a = tfl.Dense(64, activation="linear", kernel_regularizer='l2')(a)
    a = tfl.BatchNormalization()(a)
    a = tfl.Activation('relu')(a)
    a = tfl.Dropout(rate=0.2)(a)
    a = tfl.Dense(32, activation="linear", kernel_regularizer='l2')(a)
    a = tfl.BatchNormalization()(a)
    a = tfl.Activation('relu')(a)

    embedding_right_output= shared_embedding_layer2(right_input)
    b = tfl.Dropout(0.3)(embedding_right_output)
    b = tfl.Dense(64, activation="linear", a='l2')(b)
    b = tfl.BatchNormalization()(b)
    b = tfl.Activation('relu')(b)
    a = tfl.Dropout(rate=0.2)(a)
    b = tfl.Dense(32, activation="linear", kernel_regularizer='l2')(b)
    b = tfl.BatchNormalization()(b)
    b = tfl.Activation('relu')(b)

    # combined_features = tfl.concatenate([a, b], name = 'merge_features')
    # combined_features = tfl.Dense(16, activation = 'linear')(combined_features)
    # combined_features = tfl.BatchNormalization()(combined_features)
    # combined_features = tfl.Activation('relu')(combined_features)
    # combined_features = tfl.Dropout(rate=0.2)(combined_features)
    # combined_features = tfl.Dense(4, activation = 'linear')(combined_features)
    # combined_features = tfl.BatchNormalization()(combined_features)
    # combined_features = tfl.Activation('relu')(combined_features)
    # combined_features = tfl.Dense(1, activation = 'sigmoid')(combined_features)

    distance = tfl.Lambda(euclidean_distance)([a, b])
    # cosine_similiarity=tf.keras.layers.Dot(axes=-1,normalize=True)([a,b])

    # output = tfl.Dense(1, activation="sigmoid")(distance)
    model = Model([left_input, right_input], distance)
    return model

In [None]:
model = get_model()

In [None]:
model.summary()

In [None]:
model.compile(optimizer="adam", loss = contrastive_loss, metrics = [tf.metrics.BinaryAccuracy()])

In [None]:
loss_history = model.fit(siam_gen([df["clean_text"], df["reason"]], 64), 
                         validation_data = siam_gen([df_test["clean_text"], df_test["reason"]], 32),
                         steps_per_epoch=50, validation_steps=10, epochs = 100, 
                         verbose = True, use_multiprocessing=True)