<a href="https://colab.research.google.com/github/reitezuz/notebooks-for-NES2-2024/blob/main/lecture_07/text_classification1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple Natural Language Processing Example - Sentiment Classification

IMDB Dataset is a dataset for binary sentiment classification (positive or negative reviews). It contains a set of 25,000 highly polar movie reviews for training and 25,000 for testing.

 - http://ai.stanford.edu/~amaas/data/sentiment/
 - https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews

Based on: https://github.com/fchollet/deep-learning-with-python-notebooks/blob/master/chapter11_part02_sequence-models.ipynb




### Download and extract the zip file with the data

In [None]:
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz
!rm -r aclImdb/train/unsup

## Prepare the data:
1. divide the train folder into two folders: train and val


In [None]:
import os, pathlib, shutil, random

# Create the directory with the validation data
base_dir = pathlib.Path("aclImdb")
#base_dir = pathlib.Path("data")
val_dir = base_dir / "val"
train_dir = base_dir / "train"
for category in ("neg", "pos"):
    os.makedirs(val_dir / category)
    files = os.listdir(train_dir / category)
    random.Random(1337).shuffle(files)
    num_val_samples = int(0.2 * len(files))
    val_files = files[-num_val_samples:]
    for fname in val_files:
        shutil.move(train_dir / category / fname,
                    val_dir / category / fname)

2. create the datasets from the directories

In [None]:
import keras

batch_size = 32

# Create the training, validation and testing data sets:
train_ds = keras.utils.text_dataset_from_directory(
    "aclImdb/train", batch_size=batch_size
)
val_ds = keras.utils.text_dataset_from_directory(
    "aclImdb/val", batch_size=batch_size
)
test_ds = keras.utils.text_dataset_from_directory(
    "aclImdb/test", batch_size=batch_size
)
text_only_train_ds = train_ds.map(lambda x, y: x)

Found 75000 files belonging to 3 classes.
Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


## Vectorize the Data

- Define the (maximum) length of the sequences.
- Define the number of tokens to keep (skip the unimportant ones): choosing the 20,000 most important tokens is a reasonable choice.
- Note: Vectorization always runs on the CPU.

In [None]:
from keras import layers

max_length = 600      # Maximum length of each sequence (longer sequences will be truncated)
max_tokens = 20000    # Number of (most important) tokens

text_vectorization = layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode="int",                 # Convert text to sequences of integer indices
    output_sequence_length=max_length, # Ensure sequences have the given fixed length
)

# This step analyzes the dataset to create a vocabulary based on the most frequent tokens
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)

# Observe the data:
for inputs, targets in int_train_ds:
      print("inputs.shape:", inputs.shape)
      print("inputs.dtype:", inputs.dtype)
      print("targets.shape:", targets.shape)
      print("targets.dtype:", targets.dtype)
      print("inputs:", inputs)
      print("targets:", targets)
      break

inputs.shape: (32, 600)
inputs.dtype: <dtype: 'int64'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs: tf.Tensor(
[[  10  214   44 ...    0    0    0]
 [2734  643   20 ...    0    0    0]
 [8081 2031    3 ...    0    0    0]
 ...
 [  10  439 1820 ...    0    0    0]
 [  74   95 2265 ...    0    0    0]
 [  99 2190    2 ...    0    0    0]], shape=(32, 600), dtype=int64)
targets: tf.Tensor([2 0 2 2 2 2 0 1 1 2 2 2 2 1 1 2 2 2 0 2 2 2 2 2 2 1 2 2 2 0 2 1], shape=(32,), dtype=int32)


## The bag of words approach - 1. Unigrams



### Preprocess the data
- configure the TextVectorization layer to return binary encoded unigrams

In [None]:
text_vectorization = layers.TextVectorization(
    max_tokens=20000,
    output_mode="multi_hot",
)
text_only_train_ds = train_ds.map(lambda x, y: x)
text_vectorization.adapt(text_only_train_ds)

binary_1gram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
binary_1gram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
binary_1gram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)

for inputs, targets in binary_1gram_train_ds:
    print("inputs.shape:", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("targets.shape:", targets.shape)
    print("targets.dtype:", targets.dtype)
    print("inputs[0]:", inputs[0])
    print("targets[0]:", targets[0])
    break

inputs.shape: (32, 20000)
inputs.dtype: <dtype: 'int64'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs[0]: tf.Tensor([0 1 1 ... 0 0 0], shape=(20000,), dtype=int64)
targets[0]: tf.Tensor(1, shape=(), dtype=int32)


### Train a MLP model on binary encoded unigrams

In [None]:
import keras
from keras import layers

def get_model(max_tokens=20000, hidden_dim=16):
    inputs = keras.Input(shape=(max_tokens,))
    x = layers.Dense(hidden_dim, activation="relu")(inputs)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)

    model = keras.Model(inputs, outputs)
    model.compile(optimizer="rmsprop",
                  loss="binary_crossentropy",
                  metrics=["accuracy"])
    return model

model = get_model()
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint("binary_1gram.keras",
                                    save_best_only=True)
]
model.fit(binary_1gram_train_ds.cache(),
          validation_data=binary_1gram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
model = keras.models.load_model("binary_1gram.keras")
print(f"Test acc: {model.evaluate(binary_1gram_test_ds)[1]:.3f}")

Epoch 1/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 19ms/step - accuracy: 0.7348 - loss: 0.5297 - val_accuracy: 0.8702 - val_loss: 0.3333
Epoch 2/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 5ms/step - accuracy: 0.8842 - loss: 0.3244 - val_accuracy: 0.8726 - val_loss: 0.3164
Epoch 3/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9026 - loss: 0.2811 - val_accuracy: 0.8740 - val_loss: 0.3317
Epoch 4/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9078 - loss: 0.2641 - val_accuracy: 0.8766 - val_loss: 0.3466
Epoch 5/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 5ms/step - accuracy: 0.9111 - loss: 0.2443 - val_accuracy: 0.8740 - val_loss: 0.3752
Epoch 6/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9174 - loss: 0.2373 - val_accuracy: 0.8704 - val_loss: 0.3897
Epoch 7/10
[1m625/625[0m

## The bag of words approach - 2. Bigrams

### Preprocess the data
- configure the TextVectorization layer to return binary encoded bigrams

In [None]:
text_vectorization = layers.TextVectorization(
    ngrams=2,
    max_tokens=20000,
    output_mode="multi_hot",
)

text_vectorization.adapt(text_only_train_ds)
binary_2gram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
binary_2gram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
binary_2gram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)

for inputs, targets in binary_2gram_train_ds:
    print("inputs.shape:", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("targets.shape:", targets.shape)
    print("targets.dtype:", targets.dtype)
    print("inputs[0]:", inputs[0])
    print("targets[0]:", targets[0])
    break

inputs.shape: (32, 20000)
inputs.dtype: <dtype: 'int64'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs[0]: tf.Tensor([1 1 1 ... 0 0 0], shape=(20000,), dtype=int64)
targets[0]: tf.Tensor(1, shape=(), dtype=int32)


### Train a MLP model on binary encoded bigrams

In [None]:
model = get_model()
model.summary()
callbacks = [
    keras.callbacks.ModelCheckpoint("binary_2gram.keras",
                                    save_best_only=True)
]
model.fit(binary_2gram_train_ds.cache(),
          validation_data=binary_2gram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
model = keras.models.load_model("binary_2gram.keras")
print(f"Test acc: {model.evaluate(binary_2gram_test_ds)[1]:.3f}")

Epoch 1/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 15ms/step - accuracy: 0.7655 - loss: 0.4948 - val_accuracy: 0.8850 - val_loss: 0.2942
Epoch 2/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9009 - loss: 0.2781 - val_accuracy: 0.8876 - val_loss: 0.3025
Epoch 3/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - accuracy: 0.9224 - loss: 0.2364 - val_accuracy: 0.8866 - val_loss: 0.3198
Epoch 4/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9313 - loss: 0.2217 - val_accuracy: 0.8868 - val_loss: 0.3467
Epoch 5/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - accuracy: 0.9378 - loss: 0.2015 - val_accuracy: 0.8814 - val_loss: 0.3532
Epoch 6/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 4ms/step - accuracy: 0.9418 - loss: 0.1897 - val_accuracy: 0.8824 - val_loss: 0.3943
Epoch 7/10
[1m625/625[0m

### Apply the model to new texts

In [None]:
# create an inference model:
import keras
inputs = keras.Input(shape=(1,), dtype="string")
processed_inputs = text_vectorization(inputs)
outputs = model(processed_inputs)
inference_model = keras.Model(inputs, outputs)

In [None]:
# use the model:
import tensorflow as tf
raw_text_data = tf.convert_to_tensor([
    ["That was an awful movie, I hate it."],
    ["That was an excellent movie, I love it."],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
])
predictions = inference_model(raw_text_data)
for i in range(len(raw_text_data)):
    print(f"text {i}: {float(predictions[i] * 100):.2f} percent positive")

text 0: 29.20 percent positive
text 1: 76.49 percent positive
text 2: 64.91 percent positive


## 3. Sequential model - 1. One-hot vectors
- the training is very unefficient

In [None]:
import tensorflow as tf

inputs = keras.Input(shape=(None,), dtype="int64")
# Use Lambda layer to wrap tf.one_hot and specify output_shape
embedded = layers.Lambda(lambda x: tf.one_hot(x, depth=max_tokens),
                        output_shape=(max_length, max_tokens))(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("one_hot_bidir_lstm.keras", save_best_only=True)]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)


#model = keras.models.load_model("one_hot_bidir_lstm.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/10
[1m  79/2344[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m3:48:06[0m 6s/step - accuracy: 0.1765 - loss: -1.8908

KeyboardInterrupt: 

## 4. Sequential model that uses an Embedding layer trained from scratch

In [None]:
inputs = keras.Input(shape=(None,), dtype="int64")
# Add an Embedding layer to convert integer tokens into dense vectors
embedded = layers.Embedding(input_dim=max_tokens, output_dim=256)(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("embeddings_bidir_gru.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model("embeddings_bidir_gru.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/10
[1m 152/2344[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m12:30[0m 343ms/step - accuracy: 0.1663 - loss: -4.1635

## 5. Sequential model that uses an Embedding layer with masking enabled

In [None]:
inputs = keras.Input(shape=(None,), dtype="int64")
embedded = layers.Embedding(
    input_dim=max_tokens, output_dim=256, mask_zero=True)(inputs) # masking
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("embeddings_bidir_gru_with_masking.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model("embeddings_bidir_gru_with_masking.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m276s[0m 433ms/step - accuracy: 0.6839 - loss: 0.5689 - val_accuracy: 0.8474 - val_loss: 0.3502
Epoch 2/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m267s[0m 427ms/step - accuracy: 0.8593 - loss: 0.3360 - val_accuracy: 0.8738 - val_loss: 0.2978
Epoch 3/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m319s[0m 422ms/step - accuracy: 0.8957 - loss: 0.2596 - val_accuracy: 0.8408 - val_loss: 0.3691
Epoch 4/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m325s[0m 426ms/step - accuracy: 0.9204 - loss: 0.2100 - val_accuracy: 0.8764 - val_loss: 0.3075
Epoch 5/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m242s[0m 386ms/step - accuracy: 0.9400 - loss: 0.1617 - val_accuracy: 0.8750 - val_loss: 0.3436
Epoch 6/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m246s[0m 393ms/step - accuracy: 0.9612 - loss: 0.1160 - val_accuracy: 0.8740 - val_loss: 0.3976
Epoc

## 6. Sequential model that uses pretrained word embeddings

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip -q glove.6B.zip

'wget' is not recognized as an internal or external command,
operable program or batch file.
'unzip' is not recognized as an internal or external command,
operable program or batch file.


In [None]:
# Parse the GloVe word-embeddings file
import numpy as np
path_to_glove_file = "glove.6B.100d.txt"

embeddings_index = {}
with open(path_to_glove_file) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print(f"Found {len(embeddings_index)} word vectors.")

UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 2776: character maps to <undefined>

In [None]:
# Prepare the GloVe word-embeddings matrix
embedding_dim = 100

vocabulary = text_vectorization.get_vocabulary()
word_index = dict(zip(vocabulary, range(len(vocabulary))))

embedding_matrix = np.zeros((max_tokens, embedding_dim))
for word, i in word_index.items():
    if i < max_tokens:
        embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

In [None]:
# Embedding layer
embedding_layer = layers.Embedding(
    max_tokens,
    embedding_dim,
    embeddings_initializer=keras.initializers.Constant(embedding_matrix),
    trainable=False,
    mask_zero=True,
)

#### Model that uses a pretrained Embedding layer

In [None]:
inputs = keras.Input(shape=(None,), dtype="int64")
embedded = embedding_layer(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("glove_embeddings_sequence_model.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model("glove_embeddings_sequence_model.keras")
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 56ms/step - accuracy: 0.6302 - loss: 0.6318 - val_accuracy: 0.7868 - val_loss: 0.4610
Epoch 2/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m38s[0m 60ms/step - accuracy: 0.7797 - loss: 0.4693 - val_accuracy: 0.8036 - val_loss: 0.4269
Epoch 3/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 59ms/step - accuracy: 0.8208 - loss: 0.4100 - val_accuracy: 0.8348 - val_loss: 0.3754
Epoch 4/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 57ms/step - accuracy: 0.8373 - loss: 0.3751 - val_accuracy: 0.8450 - val_loss: 0.3525
Epoch 5/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 65ms/step - accuracy: 0.8471 - loss: 0.3562 - val_accuracy: 0.8516 - val_loss: 0.3318
Epoch 6/10
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 57ms/step - accuracy: 0.8594 - loss: 0.3330 - val_accuracy: 0.8560 - val_loss: 0.3249
Epoch 7/10
[1m6

## 7. Simple Transformer model trained from scratch on batch of words

### Transformer encoder implemented as a subclassed Layer

In [None]:
import tensorflow as tf
import keras
from keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

Define and train the Transformer model

In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("transformer_encoder.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)
model = keras.models.load_model(
    "transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/20
[1m625/625[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m792s[0m 1s/step - accuracy: 0.5648 - loss: 0.8661 - val_accuracy: 0.7908 - val_loss: 0.4528
Epoch 2/20
[1m276/625[0m [32m━━━━━━━━[0m[37m━━━━━━━━━━━━[0m [1m6:59[0m 1s/step - accuracy: 0.8119 - loss: 0.4094

KeyboardInterrupt: 

## 8. Simple Transformer model trained from scratch on positional embedding

Implement positional embedding as a subclassed layer

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim, mask_zero=True) ## added
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return None
        #return tf.not_equal(inputs, 0)
        #return tf.cast(inputs != 0, dtype=tf.bool)
        #return tf.math.not_equal(inputs, 0)
        # Updated: Use TensorFlow compatible boolean mask creation
        mask_layer = layers.Lambda(lambda x: tf.cast(x != 0, dtype=tf.bool))
        return mask_layer(inputs)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

Combine the Transformer encoder with positional embedding

In [None]:

vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)
model = keras.models.load_model(
    "full_transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionalEmbedding": PositionalEmbedding})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

Epoch 1/20
[1m590/625[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m50s[0m 1s/step - accuracy: 0.6197 - loss: 0.7609

KeyboardInterrupt: 