<a href="https://colab.research.google.com/github/reitezuz/18NES2-2025/blob/main/week_10/text_classification_IMDB1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple Natural Language Processing Example - Sentiment Classification

IMDB Dataset is a dataset for binary sentiment classification (positive or negative reviews). It contains a set of 25,000 highly polar movie reviews for training and 25,000 for testing.

 - http://ai.stanford.edu/~amaas/data/sentiment/
 - https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews

Based on: https://github.com/fchollet/deep-learning-with-python-notebooks/blob/master/chapter14_text-classification.ipynb


In [1]:
import os
os.environ["KERAS_BACKEND"] = "jax"



### Download and extract the zip file with the data

In [2]:
!rm -r aclImdb
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz
!rm -r aclImdb/train/unsup



rm: cannot remove 'aclImdb': No such file or directory
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 80.2M  100 80.2M    0     0  57.1M      0  0:00:01  0:00:01 --:--:-- 57.1M


In [3]:
import os, pathlib, shutil, random, keras

imdb_extract_dir = pathlib.Path("aclImdb")
for path in imdb_extract_dir.glob("*/*"):
    if path.is_dir():
        print(path)


aclImdb/test/pos
aclImdb/test/neg
aclImdb/train/pos
aclImdb/train/neg


In [4]:
# observe some reviews: the file name ends with the actual grade
!cat aclImdb/train/pos/10014_8.txt


The Night Listener held my attention, with Robin Williams shining as a New York City radio host who becomes enamored with his friendship with a 14 year old boy (Rory Culkin) who is very ill. Williams has never met the boy in person, as they have only been in contact by talking on the telephone. However, Williams' ex-boyfriend (nice job from Bobby Cannavale) raises doubt about the boy, which prompts Williams to arrange a meeting with him in person. What follows makes a permanent impact on Williams in a way he does not expect. I will leave it at that. Toni Collette also stars.<br /><br />I enjoyed this film, with Toni Collette giving a memorable portrayal of Culkin's adoptive mother. Sandra Oh also starred as Williams' friend. The Night Listener is inspired by actual events, and it has a somber, almost creepy silence throughout. At times it is predictable, no thanks to some of the reviews I read before seeing the movie and just due to logic, but I liked it anyway. I enjoy Williams in rol

In [5]:
!cat aclImdb/train/neg/10024_3.txt

First lesson that some film makers (particularly those inspired by Hollywood) need to know - just 'style' does not sell. I guess Tashan when translated will mean style. Second, if you are hell bent on selling style, that does not spare you from having a decent story.<br /><br />Tashan has some story which could have sufficed with some better director. But it is not slick. For example, all three - Saif, Kareena and Akshay - are narrators at different points in the story. But this setup is not utilized to properly. There could have been a better mix and match of their narrations. Actions sequences are from the seventies.<br /><br />Cheoreography of the film is awful. I think Vaibhavi Merchant just sleep walked through this film. Vishal-Shekhar have put up a good score but it does not belong to this film. Why is there a sufi song (Dil Haara) in Tashan? Why is the cool Hinglish song (Dil Dance Maare) not on Anil Kapoor when he is the one who is English crazy? <br /><br />Akshay Kumar is th

## Prepare the data:
1. divide the train folder into two folders: train and val


In [6]:
import os, pathlib, shutil, random

# Create the directory with the validation data
base_dir = pathlib.Path("aclImdb")
val_dir = base_dir / "val"
train_dir = base_dir / "train"
test_dir = base_dir / "test"


os.makedirs(val_dir)
for category in ("neg", "pos"):
    os.makedirs(val_dir / category)
    files = os.listdir(train_dir / category)
    random.Random(1337).shuffle(files)
    num_val_samples = int(0.2 * len(files))
    val_files = files[-num_val_samples:]
    for fname in val_files:
        shutil.move(train_dir / category / fname,
                    val_dir / category / fname)



In [7]:
for path in base_dir.glob("*/*"):
    if path.is_dir():
        print(path)

aclImdb/test/pos
aclImdb/test/neg
aclImdb/train/pos
aclImdb/train/neg
aclImdb/val/pos
aclImdb/val/neg


2. create the datasets from the directories

In [8]:
import keras

batch_size = 32

# Create the training, validation and testing data sets:
train_ds = keras.utils.text_dataset_from_directory(
    train_dir, batch_size=batch_size
)
val_ds = keras.utils.text_dataset_from_directory(
    val_dir, batch_size=batch_size
)
test_ds = keras.utils.text_dataset_from_directory(
    test_dir, batch_size=batch_size
)
text_only_train_ds = train_ds.map(lambda x, y: x)

Found 20000 files belonging to 2 classes.
Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


## The bag of words approach - 1. Unigrams



### Preprocess the data
- Configure the TextVectorization layer to return bag-of-words
- Define the number of tokens to keep (skip the unimportant ones): choosing the 20,000 most important tokens is a reasonable choice.
- Note: Vectorization always run on the CPU (we set num_parallel calls).

In [None]:
max_tokens = 20000
max_length = 600      # Maximum length of each sequence (longer sequences will be truncated)


text_vectorization = keras.layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode="multi_hot", # "tf_idf" "counts"
    split = "whitespace"
)

# create vocabulary based on the daataset
text_only_train_ds = train_ds.map(lambda x, y: x)
text_vectorization.adapt(text_only_train_ds)

bow_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)
bow_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)
bow_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)

for inputs, targets in bow_train_ds:
    print("inputs.shape:", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("targets.shape:", targets.shape)
    print("targets.dtype:", targets.dtype)
    print("inputs[0]:", inputs[0])
    print("targets[0]:", targets[0])
    break

In [None]:
# 20 000 input features, batch size 32
x, y = next(bow_train_ds.as_numpy_iterator())
print(x.shape, y.shape)


In [None]:
# observe the vocabulary
text_vectorization.get_vocabulary()[0:12]

In [None]:
text_vectorization.get_vocabulary()[100:108]

### Define a MLP (or a simple linear classifier)

In [None]:
from keras import layers

def get_mlp_model(max_tokens=20000, hidden_dim=None):
    inputs = keras.Input(shape=(max_tokens,))
    if hidden_dim is not None: # one hidden layer
        x = layers.Dense(hidden_dim, activation="relu")(inputs)
        x = layers.Dropout(0.5)(x)
    else: # simple linear classifier (no hidden layer)
        x = inputs
    outputs = layers.Dense(1, activation="sigmoid")(x)

    model = keras.Model(inputs, outputs)
    model.compile(optimizer="adam",
                  loss="binary_crossentropy",
                  metrics=["accuracy"])
    return model

model = get_mlp_model()
model.summary()


In [None]:
# plot the training progress:
def plot_history(history):
    history_dict = history.history
    print(history_dict.keys())

    from matplotlib import pyplot as plt

    # Plot training & validation accuracy values
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.show()

    # Plot training & validation loss values
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    plt.show()

In [None]:
import pandas as pd
results_df = pd.DataFrame()

### Train a MLP model on bag of words (unigrams)

In [None]:
hidden_dim = None
max_epochs = 10
model_name = "BOW_MLP.keras"

###############################################
# Define the model architecture:
model = get_mlp_model(max_tokens, hidden_dim)
model.summary()

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(bow_train_ds.cache(),
          validation_data=bow_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(bow_train_ds)
val_loss, val_acc = model.evaluate(bow_val_ds)
test_loss, test_acc = model.evaluate(bow_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
results_df.to_csv("results.csv", index=False)
results_df

### Apply the model to new texts

In [None]:
import keras
import tensorflow as tf

base_model = keras.models.load_model("models/BOW_MLP.keras")

raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
], dtype=tf.string)

# Vectorize the raw text data using the adapted TextVectorization layer
processed_raw_text_data = text_vectorization(raw_text_data)

# Make predictions using the base model with the vectorized numerical input
predictions = base_model(processed_raw_text_data)

for i in range(len(raw_text_data)):
    # Access the scalar value by indexing predictions[i] with [0]
    print(f"text {i}: {float(predictions[i][0] * 100):.2f} percent positive")

## The bag of words approach - 2. Bigrams

### Preprocess the data
- configure the TextVectorization layer to return binary encoded bigrams

In [None]:
max_tokens = 30000
text_vectorization = layers.TextVectorization(
    ngrams=2,                # change here
    max_tokens=max_tokens,
    output_mode= "multi_hot", # you can try also: "count", "multi_hot", "tf_idf"
)

text_vectorization.adapt(text_only_train_ds)
bigram_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)
bigram_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)
bigram_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=8)

for inputs, targets in bigram_train_ds:
    print("inputs.shape:", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("targets.shape:", targets.shape)
    print("targets.dtype:", targets.dtype)
    print("inputs[0]:", inputs[0])
    print("targets[0]:", targets[0])
    break

In [None]:
x, y = next(bigram_train_ds.as_numpy_iterator())
print(x.shape, y.shape)

In [None]:
text_vectorization.get_vocabulary()[0:12]

In [None]:
text_vectorization.get_vocabulary()[100:108]

### Train a MLP model on binary encoded bigrams

In [None]:
hidden_dim = None
max_epochs = 10
model_name = "bigram_MLP.keras"

###############################################
# Define the model architecture:
model = get_mlp_model(max_tokens, hidden_dim)
model.summary()

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(bigram_train_ds.cache(),
          validation_data=bigram_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(bigram_train_ds)
val_loss, val_acc = model.evaluate(bigram_val_ds)
test_loss, test_acc = model.evaluate(bigram_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
# save the dataframe
results_df.to_csv("results.csv", index=False)
results_df

In [None]:
# remove the second row from df:
#results_df = results_df.drop(1)
#results_df

In [None]:
# remve big datasets from memory

del bigram_train_ds
del bigram_val_ds
del bigram_test_ds


### Apply the model to new texts

In [None]:
import keras
import tensorflow as tf

base_model = keras.models.load_model("models/bigram_MLP.keras")

raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
], dtype=tf.string)

# Vectorize the raw text data using the adapted TextVectorization layer
processed_raw_text_data = text_vectorization(raw_text_data)

# Make predictions using the base model with the vectorized numerical input
predictions = base_model(processed_raw_text_data)

for i in range(len(raw_text_data)):
    # Access the scalar value by indexing predictions[i] with [0]
    print(f"text {i}: {float(predictions[i][0] * 100):.2f} percent positive")

__Observation:__ better results than for unigrams

## Sequential model on one-hot vectors
- all sequences are truncated or padded to the length 600
- we use the biridectional LSTM model
- the training is very slow and unefficient

In [None]:

from keras import layers

max_length = 600      # Maximum length of each sequence (longer sequences will be truncated)
max_tokens = 20000    # Number of (most important) tokens

text_vectorization = layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode="int",                 # Convert text to sequences of integer indices
    output_sequence_length=max_length, # Ensure sequences have the given fixed length
)

# This step analyzes the dataset to create a vocabulary based on the most frequent tokens
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_val_ds = val_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_test_ds = test_ds.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)

# Observe the data:
for inputs, targets in int_train_ds:
      print("inputs.shape:", inputs.shape)
      print("inputs.dtype:", inputs.dtype)
      print("targets.shape:", targets.shape)
      print("targets.dtype:", targets.dtype)
      print("inputs:", inputs)
      print("targets:", targets)
      break



In [None]:
x, y = next(int_test_ds.as_numpy_iterator())
x.shape, y.shape

In [None]:
from keras import ops # Ensure ops is imported

rnn_units = 32
max_epochs = 10
model_name = "one_hot_bidir_lstm.keras"

###############################################
# one-hot encode the input sequence:
inputs = keras.Input(shape=(max_length,), dtype="int64") # Input shape is (None, max_length)
one_hot_encoded = layers.Lambda(lambda x: ops.one_hot(x, max_tokens), output_shape=(max_length, max_tokens))(inputs)
embedded = layers.Lambda(lambda x: ops.cast(x, "float32"))(one_hot_encoded)

# This 3D tensor is compatible with Bidirectional(LSTM).
x = layers.Bidirectional(layers.LSTM(rnn_units))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)

model.compile(optimizer="adam",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(int_train_ds.cache(),
          validation_data=int_val_ds.cache(),
          epochs=10,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(int_train_ds)
val_loss, val_acc = model.evaluate(int_val_ds)
test_loss, test_acc = model.evaluate(int_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
# save the dataframe
results_df.to_csv("results.csv", index=False)
results_df

### Apply the model to new data

In [None]:
import keras
import tensorflow as tf

base_model = model

raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
], dtype=tf.string)

# Vectorize the raw text data using the adapted TextVectorization layer
processed_raw_text_data = text_vectorization(raw_text_data)

# Make predictions using the base model with the vectorized numerical input
predictions = base_model(processed_raw_text_data)

for i in range(len(raw_text_data)):
    # Access the scalar value by indexing predictions[i] with [0]
    print(f"text {i}: {float(predictions[i][0] * 100):.2f} percent positive")

## Sequential model that uses an Embedding layer trained from scratch
- __Embedding__ layer: https://keras.io/api/layers/core_layers/embedding/

`layers.Embedding(input_dim=max_tokens, output_dim=embedding_dim, mask_zero=True)`

- we set `mask_zero=True` to let the model ignore padding positions during training  
  (padding tokens with index 0 are not included in the computation of LSTM/GRU states)
  - this usually improves performance on variable-length text sequences

In [None]:
rnn_units = 32
embedding_dim = 64 #256
max_epochs = 10
model_name = "embeddings_bidir_gru.keras"

###############################################
# Define the model architecture:
inputs = keras.Input(shape=(None,), dtype="int64")

# Add an Embedding layer to convert integer tokens into dense vectors
embedded = layers.Embedding(
      input_dim=max_tokens,
      output_dim=embedding_dim,
      mask_zero=True
    )(inputs)
x = layers.Bidirectional(layers.GRU(rnn_units))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(int_train_ds.cache(),
          validation_data=int_val_ds.cache(),
          epochs=max_epochs,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(int_train_ds)
val_loss, val_acc = model.evaluate(int_val_ds)
test_loss, test_acc = model.evaluate(int_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
# save the dataframe
results_df.to_csv("results.csv", index=False)
results_df

__Observation:__
- the training is slow (in our case about 40 minutes on google colab CPU)
- the model overfitts early
- the results are worse than for the bigram+MLP approach: most probably because of the limit on sequence length (600 words)

### Apply the model to new data

In [None]:
# create an inference model:
import keras
inputs = keras.Input(shape=(1,), dtype="string")
model = keras.models.load_model("models/embeddings_bidir_gru.keras")
processed_inputs = text_vectorization(inputs)
outputs = model(processed_inputs)
inference_model = keras.Model(inputs, outputs)

import tensorflow as tf
raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
], dtype=tf.string)
predictions = inference_model(raw_text_data)
for i in range(len(raw_text_data)):
    print(f"text {i}: {float(predictions[i] * 100):.2f} percent positive")

## Sequential model that uses existing pretrained word embeddings
- useful for small datasets

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip -q glove.6B.zip

In [None]:
# Parse the GloVe word-embeddings file
import numpy as np
path_to_glove_file = "glove.6B.100d.txt"

embeddings_index = {}
with open(path_to_glove_file) as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs

print(f"Found {len(embeddings_index)} word vectors.")

In [None]:
# Prepare the GloVe word-embeddings matrix
embedding_dim = 100

vocabulary = text_vectorization.get_vocabulary()
word_index = dict(zip(vocabulary, range(len(vocabulary))))

embedding_matrix = np.zeros((max_tokens, embedding_dim))
for word, i in word_index.items():
    if i < max_tokens:
        embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

In [None]:
# Embedding layer
embedding_layer = layers.Embedding(
    max_tokens,
    embedding_dim,
    embeddings_initializer=keras.initializers.Constant(embedding_matrix),
    trainable=False,
    mask_zero=True,
)

In [None]:
hidden_dim = 32
embedding_dim = 64 #256
max_epochs = 10
model_name = "embeddings_glove_gru.keras"

###############################################
# Define the model architecture:
inputs = keras.Input(shape=(None,), dtype="int64")

# Add an Embedding layer to convert integer tokens into dense vectors
embedded = embedding_layer(inputs)
x = layers.Bidirectional(layers.GRU(hidden_dim))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(int_train_ds.cache(),
          validation_data=int_val_ds.cache(),
          epochs=max_epochs,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(int_train_ds)
val_loss, val_acc = model.evaluate(int_val_ds)
test_loss, test_acc = model.evaluate(int_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
# save the dataframe
results_df.to_csv("results.csv", index=False)
results_df

### Apply the model to new data

In [None]:
import keras
import tensorflow as tf

base_model = keras.models.load_model("models/embeddings_glove_gru.keras")

raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
], dtype=tf.string)

# Vectorize the raw text data using the adapted TextVectorization layer
processed_raw_text_data = text_vectorization(raw_text_data)

# Make predictions using the base model with the vectorized numerical input
predictions = base_model(processed_raw_text_data)

for i in range(len(raw_text_data)):
    # Access the scalar value by indexing predictions[i] with [0]
    print(f"text {i}: {float(predictions[i][0] * 100):.2f} percent positive")

__Observations:__ worse results than training embedding from scratch for domain-specific sentences (e.g., last two)

## Sequential model that uses uses word embeddings pretrained by a CBOW model trained from scratch

In [None]:
imdb_vocabulary = text_vectorization.get_vocabulary()
tokenize_no_padding = keras.layers.TextVectorization(
    vocabulary=imdb_vocabulary,
    split="whitespace",
    output_mode="int",
)

#### Preprocess the data for the CBOW model
- create windows of tokens from the training data, where each window consists of context words and a target word.
- The `window_data` function generates sliding windows of tokens, and `split_label`
 separates the context (bag) from the target (label) word within each window.
- The dataset is then mapped through a tokenizer, windowed, and split into (context, target) pairs.

In [None]:
import tensorflow as tf

context_size = 4
window_size = 9

def window_data(token_ids):
    num_windows = tf.maximum(tf.size(token_ids) - context_size * 2, 0)
    windows = tf.range(window_size)[None, :]
    windows = windows + tf.range(num_windows)[:, None]
    windowed_tokens = tf.gather(token_ids, windows)
    return tf.data.Dataset.from_tensor_slices(windowed_tokens)

def split_label(window):
    left = window[:context_size]
    right = window[context_size + 1 :]
    bag = tf.concat((left, right), axis=0)
    label = window[4]
    return bag, label

dataset = keras.utils.text_dataset_from_directory(
    imdb_extract_dir / "train", batch_size=None
)
dataset = dataset.map(lambda x, y: x, num_parallel_calls=8)
dataset = dataset.map(tokenize_no_padding, num_parallel_calls=8)
dataset = dataset.interleave(window_data, cycle_length=8, num_parallel_calls=8)
dataset = dataset.map(split_label, num_parallel_calls=8)



### Train the CBOW  model

In [None]:
hidden_dim = 64
inputs = keras.Input(shape=(2 * context_size,))
cbow_embedding = layers.Embedding(
    max_tokens,
    hidden_dim,
)
x = cbow_embedding(inputs)
x = layers.GlobalAveragePooling1D()(x)
outputs = layers.Dense(max_tokens, activation="sigmoid")(x)
cbow_model = keras.Model(inputs, outputs)
cbow_model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["sparse_categorical_accuracy"],
)

cbow_model.summary(line_length=80)

dataset = dataset.batch(1024).cache()
cbow_model.fit(dataset, epochs=4)



#### Using the pretrained embedding for classification

In [None]:
gru_dim = 32
embedding_dim = 64 #256
max_epochs = 10
model_name = "embeddings_cbow_gru.keras"

###############################################
# Define the model architecture:
inputs = keras.Input(shape=(max_length,))
lstm_embedding = layers.Embedding(
    input_dim=max_tokens,
    output_dim=embedding_dim,
    mask_zero=True,
)
x = lstm_embedding(inputs)
x = layers.Bidirectional(layers.GRU(gru_dim))(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)


model = keras.Model(inputs, outputs, name="gru_with_cbow")

lstm_embedding.embeddings.assign(cbow_embedding.embeddings)

model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=["accuracy"],
)

################################################
# Train the model
import keras, time
callbacks = [
    keras.callbacks.EarlyStopping(monitor="val_loss", restore_best_weights=True, patience=5, )
]

start_time = time.time()
history = model.fit(int_train_ds.cache(),
          validation_data=int_val_ds.cache(),
          epochs=max_epochs,
          callbacks=callbacks)
time_fit = time.time() - start_time

###############################
# Plot the training progress:
plot_history(history)

train_loss, train_acc = model.evaluate(int_train_ds)
val_loss, val_acc = model.evaluate(int_val_ds)
test_loss, test_acc = model.evaluate(int_test_ds)

print(f"Train acc: {train_acc:.3f}")
print(f"Val acc: {val_acc:.3f}")
print(f"Test acc: {test_acc:.3f}")

###############################
# Save the model:
import os
model_dir = "./models/"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
model.save(model_dir + model_name)

#################################
# Add results to the dataframe:
new_entry = {
    "Model Name" : model_name.strip(".keras"),
    "Details" : str(max_tokens) + " tokens, " + str(hidden_dim),
    "Train Loss" : train_loss,
    "Val Loss" : val_loss,
    "Test Loss" : test_loss,
    "Train Acc" : train_acc,
    "Val Acc" : val_acc,
    "Test Acc" : test_acc,
    "Epochs": max,
    "Time (s)": time_fit
}

###############################
if results_df.empty or "results_df" not in globals():
    results_df = pd.DataFrame([new_entry])
else:
    results_df = pd.concat([results_df, pd.DataFrame([new_entry])], ignore_index=True)
# save the dataframe
results_df.to_csv("results.csv", index=False)
results_df

In [None]:
import pandas as pd
results_df = pd.read_csv("results.csv")
results_df

In [None]:
import keras
import tensorflow as tf

base_model = keras.models.load_model("models/embeddings_cbow_gru.keras")

raw_text_data = tf.constant([
    ["That was an awful movie, I hate it."],
    ["Not worth seeing."],
    ["That was an excellent movie, I love it. Best movie ever."],
    ["I was shocked. Such an unexpected ending! Can't wait to see it again"],
    ["I was shocked. The movie was too short. Can't wait to see it again"],
], dtype=tf.string)

# Vectorize the raw text data using the adapted TextVectorization layer
processed_raw_text_data = text_vectorization(raw_text_data)

# Make predictions using the base model with the vectorized numerical input
predictions = base_model(processed_raw_text_data)

for i in range(len(raw_text_data)):
    # Access the scalar value by indexing predictions[i] with [0]
    print(f"text {i}: {float(predictions[i][0] * 100):.2f} percent positive")

## Simple Transformer model trained from scratch on word embedding

### Transformer encoder implemented as a subclassed Layer

In [None]:
import tensorflow as tf
import keras
from keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

Define and train the Transformer model

In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam", #"rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("transformer_encoder.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)
model = keras.models.load_model(
    "transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

## Simple Transformer model trained from scratch on positional embedding

Implement positional embedding as a subclassed layer

In [None]:
class CustomMaskingLayer(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, inputs):
        return layers.Lambda(lambda x: tf.cast(tf.math.not_equal(x, 0), dtype=tf.bool))(inputs)


    def compute_mask(self, inputs, mask=None):
        return self.call(inputs)

    def get_config(self):
        config = super().get_config()
        return config

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim, mask_zero=True) ## added
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions


    def compute_mask(self, inputs, mask=None):
        #return None
        masking_layer = CustomMaskingLayer()
        return masking_layer(inputs)
        return layers.Lambda(lambda x: tf.cast(tf.math.not_equal(x, 0), dtype=tf.bool))(inputs)
        return tf.not_equal(inputs, 0)
        #return tf.cast(inputs != 0, dtype=tf.bool)
        #return tf.math.not_equal(inputs, 0)
        # Updated: Use TensorFlow compatible boolean mask creation
        mask_layer = layers.Lambda(lambda x: tf.cast(x != 0, dtype=tf.bool))
        return mask_layer(inputs)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

Combine the Transformer encoder with positional embedding

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim, mask_zero=True) ## added
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    '''
    def compute_mask(self, inputs, mask=None):
        #return None
        return tf.not_equal(inputs, 0)
        #return tf.cast(inputs != 0, dtype=tf.bool)
        #return tf.math.not_equal(inputs, 0)
        # Updated: Use TensorFlow compatible boolean mask creation
        mask_layer = layers.Lambda(lambda x: tf.cast(x != 0, dtype=tf.bool))
        return mask_layer(inputs)
    '''
    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="adam",    #"rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

callbacks = [
    keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras",
                                    save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model(
    "full_transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionalEmbedding": PositionalEmbedding})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

## Finetuning a Pretrained Transformer

In [None]:
## Loading a pretrained Transformer

import keras_hub

tokenizer = keras_hub.models.Tokenizer.from_preset("roberta_base_en")
backbone = keras_hub.models.Backbone.from_preset("roberta_base_en")

print(tokenizer("The quick brown fox"))
backbone.summary(line_length=80)

In [None]:
# Load and process the data
from keras.utils import text_dataset_from_directory

batch_size = 16
train_dir = "aclImdb/train"
val_dir = "aclImdb/val"
test_dir = "aclImdb/test"

train_ds = text_dataset_from_directory(train_dir, batch_size=batch_size)
val_ds = text_dataset_from_directory(val_dir, batch_size=batch_size)
test_ds = text_dataset_from_directory(test_dir, batch_size=batch_size)

def preprocess(text, label):
    packer = keras_hub.layers.StartEndPacker(
        sequence_length=512,
        start_value=tokenizer.start_token_id,
        end_value=tokenizer.end_token_id,
        pad_value=tokenizer.pad_token_id,
        return_padding_mask=True,
    )
    token_ids, padding_mask = packer(tokenizer(text))
    return {"token_ids": token_ids, "padding_mask": padding_mask}, label

preprocessed_train_ds = train_ds.map(preprocess)
preprocessed_val_ds = val_ds.map(preprocess)
preprocessed_test_ds = test_ds.map(preprocess)

next(iter(preprocessed_train_ds))

In [None]:
# Create and finetune the model

inputs = backbone.input
x = backbone(inputs)
x = x[:, 0, :]
x = layers.Dropout(0.1)(x)
x = layers.Dense(768, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
classifier = keras.Model(inputs, outputs)



classifier.compile(
    optimizer=keras.optimizers.Adam(5e-5),
    loss="binary_crossentropy",
    metrics=["accuracy"],
)
classifier.fit(
    preprocessed_train_ds,
    validation_data=preprocessed_val_ds,
    epochs = 1
)

classifier.evaluate(preprocessed_test_ds)

## To be done
- bug: model name in dataframe
- change optimizer to Adam
- rerun for CBOW
- new code for Transformer
- finish and run the pretrained transformer part