# Transformer encoder for text classification

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.keras import backend as K
import numpy as np
import os, pathlib, shutil, random
import shutil

2023-05-19 09:19:19.530005: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/write/count
2023-05-19 09:19:19.530080: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/read/count
2023-05-19 09:19:19.530098: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/write/api
2023-05-19 09:19:19.530110: E tensorflow/core/lib/monitoring/collection_registry.cc:77] Cannot register 2 metrics with the same name: /tensorflow/core/saved_model/read/api


### Dataset Imdb for sentences binary classification

In [2]:
# Download the dataset

url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"

dataset = tf.keras.utils.get_file("aclImdb_v1", url,
                                    untar=True, cache_dir='.',
                                    cache_subdir='')

shutil.rmtree('aclImdb/train/unsup')

Downloading data from https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz


In [2]:
# Create a validation set with the 20% of training data

base_dir = pathlib.Path("aclImdb")
val_dir = base_dir / "val"
train_dir = base_dir / "train"
for category in ("neg", "pos"):
    os.makedirs(val_dir / category)
    files = os.listdir(train_dir / category)
    random.Random(1337).shuffle(files) 
    num_val_samples = int(0.2 * len(files)) 
    val_files = files[-num_val_samples:] 
    for fname in val_files: 
        shutil.move(train_dir / category / fname, val_dir / category / fname)

FileExistsError: ignored

In [2]:
# Prepare datasets that return integers sequences

batch_size = 32
train_ds = keras.utils.text_dataset_from_directory( "aclImdb/train", batch_size=batch_size)
val_ds = keras.utils.text_dataset_from_directory("aclImdb/val", batch_size=batch_size)
test_ds = keras.utils.text_dataset_from_directory("aclImdb/test", batch_size=batch_size)

text_only_train_ds = train_ds.map(lambda x, y: x) 

max_length = 600
max_tokens = 20000
text_vectorization = TextVectorization(
 max_tokens=max_tokens,
 output_mode="int",
 output_sequence_length=max_length, 
)
text_vectorization.adapt(text_only_train_ds)
int_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)

Found 25000 files belonging to 2 classes.


2023-05-19 09:19:22.324462: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


2023-05-19 09:19:24.334346: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:164] None of the MLIR Optimization Passes are enabled (registered 2)


In [3]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim 
        self.dense_dim = dense_dim 
        self.num_heads = num_heads 
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation="relu"),layers.Dense(embed_dim),])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None): 
        if mask is not None: 
            mask = mask[:, tf.newaxis, :] 
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
    def get_config(self): 
        config = super().get_config()
        config.update({
        "embed_dim": self.embed_dim,
        "num_heads": self.num_heads,
        "dense_dim": self.dense_dim,
        })
        return config


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs): 
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(input_dim=sequence_length, output_dim=output_dim) 
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions
    
    def compute_mask(self, inputs, mask=None): 
        return tf.math.not_equal(inputs, 0) 
    
    def get_config(self): 
        config = super().get_config()
        config.update({
        "output_dim": self.output_dim,
        "sequence_length": self.sequence_length,
        "input_dim": self.input_dim,
        })
        return config

In [4]:
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

In [6]:
inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs) 
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam",
 loss="binary_crossentropy",
 metrics=[tf.keras.metrics.BinaryAccuracy()])

In [7]:
print(model.weights)

[<tf.Variable 'positional_embedding/embedding/embeddings:0' shape=(20000, 256) dtype=float32, numpy=
array([[ 0.04429097,  0.02982345, -0.04750621, ...,  0.03149572,
        -0.02255521, -0.01286163],
       [-0.00765695, -0.01877952,  0.02530385, ...,  0.00484483,
        -0.04495964,  0.02896006],
       [ 0.02057966, -0.03033677, -0.04477325, ..., -0.03880764,
        -0.03412137, -0.03791126],
       ...,
       [-0.0259515 , -0.03587122,  0.0267035 , ..., -0.04493271,
         0.00103974, -0.02846854],
       [-0.04507805, -0.01669551,  0.00864657, ..., -0.03672589,
        -0.01724706,  0.02994117],
       [-0.03943715,  0.04555156,  0.02501934, ..., -0.04719875,
         0.04613448, -0.02291266]], dtype=float32)>, <tf.Variable 'positional_embedding/embedding_1/embeddings:0' shape=(600, 256) dtype=float32, numpy=
array([[-0.00178373, -0.03841952, -0.04867255, ..., -0.03922776,
         0.00872876,  0.02552914],
       [ 0.02510444,  0.02715831,  0.0169317 , ..., -0.01206386,
    

In [8]:
callbacks = [
    keras.callbacks.ModelCheckpoint("encoder_model/full_transformer_encoder.h5", save_best_only=True),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True)
] 
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, 
callbacks=callbacks)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20


<keras.callbacks.History at 0x7f24e70decb0>

In [5]:
model = keras.models.load_model(
    "encoder_model/full_transformer_encoder.h5",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionalEmbedding": PositionalEmbedding})

Evaluate the model accuracy with Float32 default weights

In [6]:
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

 36/782 [>.............................] - ETA: 4:52 - loss: 0.9583 - binary_accuracy: 0.5816

KeyboardInterrupt: 

Evaluate the model accuracy converting the weights to Float16

In [10]:
K.set_floatx('float16')

# Get the original weights
ws = model.get_weights()
print(np.unique([w.dtype for w in model.get_weights()]))

[dtype('float32')]


In [34]:
# Convert the weights to Posit <16,0> and load a new model
wsp = [w.astype(K.floatx()) for w in ws]

inputs = keras.Input(shape=(None,), dtype="float16")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs) 
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model_float16 = keras.Model(inputs, outputs)
model_float16.compile(optimizer="adam",
 loss="binary_crossentropy",
 metrics=["accuracy"])
model_float16.set_weights(wsp)

print(np.unique([w.dtype for w in model_float16.get_weights()]))

[dtype('float16')]


In [35]:
int_test_ds = int_test_ds.map(lambda x, y: (tf.cast(x, tf.float16), tf.cast(y, tf.float16)))

In [39]:
print(list(int_test_ds)[0])

(<tf.Tensor: shape=(32, 600), dtype=int64, numpy=
array([[   1,    1,  443, ...,    0,    0,    0],
       [4252,    2,  353, ...,    0,    0,    0],
       [  11, 1867,    7, ...,    0,    0,    0],
       ...,
       [  10, 1550,   11, ...,    0,    0,    0],
       [  21,    2,  214, ...,    0,    0,    0],
       [  74,  142,   34, ...,    0,    0,    0]])>, <tf.Tensor: shape=(32,), dtype=int32, numpy=
array([0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0,
       1, 1, 1, 1, 0, 1, 1, 0, 1, 1], dtype=int32)>)


In [37]:
print(f"Test acc: {model_float16.evaluate(int_test_ds)[1]:.3f}")

Test acc: 0.867


Evaluate the model accuracy converting the weights to Posit<16,0>

In [13]:
K.set_floatx('posit160')

# Get the original weights
ws = model.get_weights()
print(np.unique([w.dtype for w in model.get_weights()]))

ValueError: ignored

In [None]:
# Convert the weights to Posit <16,0> and load a new model
wsp = [w.astype(K.floatx()) for w in ws]

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs) 
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model_posit = keras.Model(inputs, outputs)
model_posit.compile(optimizer="adam",
 loss="binary_crossentropy",
 metrics=["accuracy"])
model_posit.set_weights(wsp)

print(np.unique([w.dtype for w in model_posit.get_weights()]))

[dtype(posit160)]


In [None]:
print(f"Test acc: {model_posit.evaluate(int_test_ds)[1]:.3f}")