In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import numpy.random as npr
import random

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import backend as K
from keras.optimizers import Adam
from keras_nlp.layers import PositionEmbedding

In [3]:
seed = 428

np.random.seed(seed)
tf.random.set_seed(seed)
random.seed(seed)

In [4]:
def bert_module(query, key, value, embed_dim, num_head, i):
    
    # Multi headed self-attention
    attention_output = layers.MultiHeadAttention(
        num_heads=num_head,
        key_dim=embed_dim // num_head,
        name="encoder_{}/multiheadattention".format(i)
    )(query, key, value, use_causal_mask=True)
    
    # Add & Normalize
    attention_output = layers.Add()([query, attention_output])  # Skip Connection
    attention_output = layers.LayerNormalization(epsilon=1e-6)(attention_output)
    
    # Feedforward network
    ff_net = keras.models.Sequential([
        layers.Dense(2 * embed_dim, activation='relu', name="encoder_{}/ffn_dense_1".format(i)),
        layers.Dense(embed_dim, name="encoder_{}/ffn_dense_2".format(i)),
    ])

    # Apply Feedforward network
    ffn_output = ff_net(attention_output)

    # Add & Normalize
    ffn_output = layers.Add()([attention_output, ffn_output])  # Skip Connection
    ffn_output = layers.LayerNormalization(epsilon=1e-6)(ffn_output)
    
    return ffn_output

In [5]:
def get_sinusoidal_embeddings(sequence_length, embedding_dim):
    position_enc = np.array([
        [pos / np.power(10000, 2. * i / embedding_dim) for i in range(embedding_dim)]
        if pos != 0 else np.zeros(embedding_dim)
        for pos in range(sequence_length)
    ])
    position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2])  # dim 2i
    position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2])  # dim 2i+1
    return tf.cast(position_enc, dtype=tf.float32)

In [6]:
def insert_element_randomly(my_list, element):
    
    if len(my_list) > 1:

        index = random.randint(0, 2)
        
    else:
        
        index = 0

    new_list = my_list[:(4 * index)] + element + my_list[(4 * index):]
    
    return new_list

In [7]:
N = 20 # vocab_size
M = 20 # number of random words

vocabs = ['word_' + str(i) for i in range(N)] + ['random_' + str(i) for i in range(M)]

vocabs_word = ['word_' + str(i) for i in range(N)]

vocab_map = {}
for i in range(len(vocabs)):
    vocab_map[vocabs[i]] = i

In [8]:
def get_accuracy_prob(embed_dim):
    
    pairs = []

    for i in vocabs_word:
        for j in vocabs_word:
            for k in vocabs_word:
                if i != j and i != k and j != k:
                    pairs.append((i,j,k))

    indicator = np.random.choice([0, 1], size=len(pairs), p=[0.5, 0.5])

    pairs_train = [pairs[i] for i in range(len(indicator)) if indicator[i] == 1]
    pairs_test = [pairs[i] for i in range(len(indicator)) if indicator[i] == 0]
    
    sentences_train = []
    sentences_number_train = []
    sentences_test_a = []
    sentences_number_test_a = []
    sentences_test_b = []
    sentences_number_test_b = []

    x_masked_train = []
    y_masked_labels_train = []
    x_masked_test_a = []
    y_masked_labels_test_a = []
    x_masked_test_b = []
    y_masked_labels_test_b = []

    for _ in range(25000):

        random_words = random.sample(['random_' + str(i) for i in range(M)], 4)
    
        [(a,b,c), (d,e,f)] = random.sample(pairs_train, 2)

        temp = [a, b, c, a, d, e, f, d]
        temp = insert_element_randomly(temp, random_words)

        sentences_train.append(temp)
        sentences_number_train.append([vocab_map[i] for i in temp])
        x_masked_train.append([vocab_map[i] for i in temp])
        y_masked_labels_train.append([vocab_map[i] for i in temp][1:])
        
        random_words = random.sample(['random_' + str(i) for i in range(M)], 4)

        [(a,b,c), (d,e,f)] = random.sample(pairs_train, 2)

        temp = [a, b, c, b, d, e, f, e]
        temp = insert_element_randomly(temp, random_words)

        sentences_train.append(temp)
        sentences_number_train.append([vocab_map[i] for i in temp])
        x_masked_train.append([vocab_map[i] for i in temp])
        y_masked_labels_train.append([vocab_map[i] for i in temp][1:])



    for _ in range(25000):

        [(a,b,c), (d,e,f), (g,h,i)] = random.sample(pairs_test, 3)
    
        temp = [a, b, c, a, d, e, f, d, g, h, i, g]

        sentences_test_a.append(temp)
        sentences_number_test_a.append([vocab_map[i] for i in temp])
        x_masked_test_a.append([vocab_map[i] for i in temp])
        y_masked_labels_test_a.append([vocab_map[i] for i in temp][1:])

        [(a,b,c), (d,e,f), (g,h,i)] = random.sample(pairs_test, 3)
    
        temp = [a, b, c, b, d, e, f, e, g, h, i, h]

        sentences_test_b.append(temp)
        sentences_number_test_b.append([vocab_map[i] for i in temp])
        x_masked_test_b.append([vocab_map[i] for i in temp])
        y_masked_labels_test_b.append([vocab_map[i] for i in temp][1:])

    x_masked_train = np.array(x_masked_train)
    y_masked_labels_train = np.array(y_masked_labels_train)
    x_masked_test_a = np.array(x_masked_test_a)
    y_masked_labels_test_a = np.array(y_masked_labels_test_a)
    x_masked_test_b = np.array(x_masked_test_b)
    y_masked_labels_test_b = np.array(y_masked_labels_test_b)

    perm = np.random.permutation(len(x_masked_train))
    x_masked_train = x_masked_train[perm]
    y_masked_labels_train = y_masked_labels_train[perm]
    
    num_head = 2

    callback = keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = 5, restore_best_weights = True)
    inputs = layers.Input((x_masked_train.shape[1],), dtype=tf.int64)
    word_embeddings = layers.Embedding(N + M, embed_dim, name="word_embedding")(inputs)
    sinusoidal_embeddings = get_sinusoidal_embeddings(len(x_masked_train[0]), embed_dim)
    encoder_output = word_embeddings + sinusoidal_embeddings
    for i in range(1):
        encoder_output = bert_module(encoder_output, encoder_output, encoder_output, embed_dim, num_head, i)

    encoder_output = keras.layers.Lambda(lambda x: x[:,:-1,:], name='slice')(encoder_output)
    mlm_output = layers.Dense(N + M, name="mlm_cls", activation="softmax")(encoder_output)
    mlm_model = keras.Model(inputs = inputs, outputs = mlm_output)
    adam = Adam()
    mlm_model.compile(loss='sparse_categorical_crossentropy', optimizer=adam)
    history = mlm_model.fit(x_masked_train, y_masked_labels_train,
                            validation_split = 0.5, callbacks = [callback], 
                            epochs=2000, batch_size=5000, 
                            verbose=0)
    
    acc_a = []
    prob_a = []
    x_test_subset_a = x_masked_test_a[np.random.choice(x_masked_test_a.shape[0], size=1000, replace=False)]

    for sentence_number in x_test_subset_a:
        temp = keras.backend.function(inputs = mlm_model.layers[0].input, outputs = mlm_model.layers[-1].output) \
            (np.array(sentence_number).reshape(1,len(sentence_number)))
        temp = temp[:,-1,:]
        acc_a.append(1 if temp.argmax() == sentence_number[-1] else 0)
        prob_a.append(temp[0][sentence_number[-1]])
        
    acc_b = []
    prob_b = []
    x_test_subset_b = x_masked_test_b[np.random.choice(x_masked_test_b.shape[0], size=1000, replace=False)]

    for sentence_number in x_test_subset_b:
        temp = keras.backend.function(inputs = mlm_model.layers[0].input, outputs = mlm_model.layers[-1].output) \
            (np.array(sentence_number).reshape(1,len(sentence_number)))
        temp = temp[:,-1,:]
        acc_b.append(1 if temp.argmax() == sentence_number[-1] else 0)
        prob_b.append(temp[0][sentence_number[-1]])
        
    return ((np.mean(acc_a), np.mean(prob_a)), (np.mean(acc_b), np.mean(prob_b)))


In [10]:
accs_a = 0
probs_a = 0
accs_b = 0
probs_b = 0

for _ in range(10):
    
    ((acc_a, prob_a), (acc_b, prob_b)) = get_accuracy_prob(10)
    
    print((acc_a, prob_a))
    print((acc_b, prob_b))
    
    accs_a += acc_a/10
    probs_a += prob_a/10
    accs_b += acc_b/10
    probs_b += prob_b/10
    
print((accs_a, probs_a))
print((accs_b, probs_b))

(0.0, 0.0104504265)
(0.0, 0.010562966)
(0.007, 0.024185756)
(0.004, 0.023544364)
(0.039, 0.050357062)
(0.041, 0.049098082)
(0.051, 0.044412825)
(0.047, 0.04345424)
(0.071, 0.05135692)
(0.079, 0.053065434)
(0.046, 0.049459204)
(0.06, 0.051130176)
(0.006, 0.013703612)
(0.012, 0.01466186)
(0.0, 0.0009088933)
(0.0, 0.00092142023)
(0.0, 0.0010364049)
(0.0, 0.0010705065)
(0.051, 0.049393263)
(0.064, 0.049239486)
(0.0271, 0.02952643660828471)
(0.0307, 0.029674853483447804)


In [11]:
accs_a = 0
probs_a = 0
accs_b = 0
probs_b = 0

for _ in range(10):
    
    ((acc_a, prob_a), (acc_b, prob_b)) = get_accuracy_prob(100)
    
    print((acc_a, prob_a))
    print((acc_b, prob_b))
    
    accs_a += acc_a/10
    probs_a += prob_a/10
    accs_b += acc_b/10
    probs_b += prob_b/10
    
print((accs_a, probs_a))
print((accs_b, probs_b))

(0.0, 6.859806e-05)
(0.0, 6.848894e-05)
(0.092, 0.054427147)
(0.118, 0.05483171)
(0.073, 0.052245557)
(0.073, 0.05311937)
(0.093, 0.052629348)
(0.096, 0.05261145)
(0.0, 0.0011938497)
(0.0, 0.0011900436)
(0.056, 0.04822184)
(0.068, 0.048953418)
(0.134, 0.1018675)
(0.47, 0.22347645)
(0.081, 0.052851494)
(0.089, 0.05308263)
(0.051, 0.050534338)
(0.049, 0.050597157)
(0.0, 0.0001349365)
(0.0, 0.00013389719)
(0.058, 0.0414174606979941)
(0.09630000000000001, 0.053806461959902664)
