In [None]:
from tensorflow import keras
from tensorflow.keras import layers
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
    return x + res

def build_model(
    input_shape,
    head_size,
    num_heads,
    ff_dim,
    num_transformer_blocks,
    mlp_units,
    dropout=0,
    mlp_dropout=0,
):
    inputs = keras.Input(shape=input_shape)
    x = inputs
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    output = layers.Dense(1, activation="linear")(x)
    return keras.Model(inputs, output)

In [None]:
import random
import numpy as np
def sample_data(iters, size, gamma = 0):
   # we sample some random likelihoods and a decision threshold
   x_data=np.zeros((iters,size, 2))
   y_data=np.zeros((iters, 1))
   for i in range(iters):
      likelihood = [random.uniform(0, 1) for _ in range(size)]
      threshold = random.uniform(0, 1)
      labels = [int(l >= threshold) if l < threshold - gamma or l> threshold + gamma else int(l >= random.uniform(threshold-gamma, threshold+gamma))for l in likelihood]
      x_data[i, :, 0] = likelihood
      x_data[i, :, 1] = labels
      y_data[i, 0]=threshold
   return x_data, y_data


In [None]:
TRAIN_SIZE=10000
TEST_SIZE=1000
INPUT_SIZE=50
x_train, y_train = sample_data(TRAIN_SIZE, INPUT_SIZE)
x_test, y_test = sample_data(TEST_SIZE, INPUT_SIZE)

x_train.shape

(10000, 50, 2)

In [None]:

input_shape=x_train.shape[1:]
model = build_model(
    input_shape,
    head_size=256,
    num_heads=4,
    ff_dim=4,
    num_transformer_blocks=4,
    mlp_units=[128],
    mlp_dropout=0.4,
    dropout=0.25,
)

In [None]:
model.compile(
    loss="mean_absolute_error",
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    metrics=["mean_absolute_error"],
)

In [None]:
import datetime
import os
model.summary()
logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = keras.callbacks.TensorBoard(logdir, histogram_freq=1)
callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 50, 2)]      0           []                               
                                                                                                  
 layer_normalization_16 (LayerN  (None, 50, 2)       4           ['input_3[0][0]']                
 ormalization)                                                                                    
                                                                                                  
 multi_head_attention_8 (MultiH  (None, 50, 2)       11266       ['layer_normalization_16[0][0]', 
 eadAttention)                                                    'layer_normalization_16[0][0]'] 
                                                                                            

In [None]:
history=model.fit(
    x_train,
    y_train,
    validation_split=0.1,
    epochs=100,
    batch_size=100,
    callbacks=[tensorboard_callback],
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 6: saving model to ./
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
 9/90 [==>...........................] - ETA: 55s - loss: 0.1551 - mean_absolute_error: 0.1551
Epoch 12: saving model to ./
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 17: saving model to ./
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
19/90 [=====>........................] - ETA: 52s - loss: 0.1329 - mean_absolute_error: 0.1329
Epoch 23: saving model to ./
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 28: saving model to ./
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 34: saving model to ./
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 39: saving model to ./
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 45: saving model to ./
Epo

In [None]:
model.evaluate(x_test, y_test, verbose=1)



[0.061850011348724365, 0.061850011348724365]

In [None]:
%load_ext tensorboard
%tensorboard --logdir logs

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


<IPython.core.display.Javascript object>

In [None]:
xs = list(range(1,INPUT_SIZE))
gamma_list=[0, 0.2, 0.4, 0.6]
def generate_ys(xs, gamma_list):
  all_ys={}
  for gamma in gamma_list:
    ys = []
    for i in xs:
      test, targ = sample_data(3000,i, gamma=gamma)
      test = np.concatenate((np.zeros((3000, INPUT_SIZE-i, 2)), test), axis=1)
      scores=model.evaluate(test, targ, verbose=1)
      ys.append(scores[0])
      all_ys[gamma] = ys
  return all_ys
all_ys = generate_ys(xs, gamma_list)



In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
for key, val in all_ys.items():
  plt.plot(xs,val, label=key)
plt.legend()
plt.title("Mean Absolute Error vs Dataset Size for Different Uncertaintes")
plt.ylabel("MAE")
plt.xlabel("ICL Shot Size")