In [2]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
import re
import pathlib
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.layers import TextVectorization
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Hemingway's Haiku

### Generating Haiku poems with Hemingway's writing

> A haiku is a short, unrhymed Japanese poem that is written in three lines of five, seven, and five syllables, respectively

### Workflow
1. Create a simple text generating model (based on DLWP and provided notebooks)
2. Evaluate the model
3. Improving the model by implementing different methods
4. Evaluate generated text
5. Back to step 3

### 1. Simple text generating model

In [4]:
# open text file of Hemingway's text
with open("../content/hemingwayshorts.txt", "r", encoding="utf-8") as file:
    text = file.read().lower()
text = re.sub(r"[^\w\s]", "", text)  # remove punctuation

# tokenizer layer setup
text_vectorization = tf.keras.layers.TextVectorization(
    standardize="lower_and_strip_punctuation",
    split="character",
    output_mode="int",
)

text_vectorization.adapt([text])
TOKEN_INDEX = dict(enumerate(text_vectorization.get_vocabulary()))
VOCAB_SIZE = len(text_vectorization.get_vocabulary())   # retrieve the vocab size afterwards

lm_dataset_raw = tf.data.Dataset.from_tensor_slices([text])

lm_dataset_tok = lm_dataset_raw.map(text_vectorization)

for t in lm_dataset_tok:
    # print(t)
    DATASET_LENGTH = t.shape[0]

# tokenizing
lm_dataset_flat = lm_dataset_tok.flat_map(
    lambda x: tf.data.Dataset.from_tensor_slices(x)
)

SEQUENCE_LENGTH = 100

lm_dataset_seqs = lm_dataset_flat.batch(
    SEQUENCE_LENGTH + 1,
    drop_remainder=True
)

for t in lm_dataset_seqs.take(1):
    print(t)

# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

lm_dataset_batched = (
    lm_dataset_seqs
        .repeat()
        .shuffle(BUFFER_SIZE)
        .batch(BATCH_SIZE, drop_remainder=True)
        .prefetch(tf.data.experimental.AUTOTUNE)
)

# for t in lm_dataset_batched.take(1):
#     print(t.shape)

def prepare_lm_dataset(tokens_batch):
    x = tokens_batch[:, :-1]  # [a b c d e f g] the model predicts top to bottom,
    y = tokens_batch[:, 1:]   # [b c d e f g h] a → b, a b → c, a b c → d, ..., in one go!
    return x, y

lm_dataset = lm_dataset_batched.map(prepare_lm_dataset, num_parallel_calls=4)

tf.Tensor(
[ 2  4  7  3  2 10  7  6 11  4  2  7  5 23 23 18  2 13  8 20  3  2  6 20
  2 20 11  5  9 19  8 10  2 17  5 19  6 17 21  3 11  2  8  4  2 15  5 10
  2  9  6 15  2 13 14  9 19  7  2  4  8 17  3  2  5  9 12  2  4  7  3 18
  2 15  3 11  3  2  5 13 13  2 10  8  4  4  8  9 16  2 14  9 12  3 11  2
  4  7  3  2 12], shape=(101,), dtype=int64)


In [24]:
@tf.keras.utils.register_keras_serializable("positional_embedding")

class PositionalEmbedding(tf.keras.layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.token_embeddings =tf.keras.layers.Embedding(
            input_dim=self.input_dim, output_dim=self.output_dim
        )
        # position embeddings: syntactic (spatial/temporal) information
        self.position_embeddings =tf.keras.layers.Embedding(
            input_dim=self.sequence_length, output_dim=self.output_dim
        )

    def build(self, input_shape):
        # token embeddings: semantic information
        self.token_embeddings =tf.keras.layers.Embedding(
            input_dim=self.input_dim, output_dim=self.output_dim
        )
        # position embeddings: syntactic (spatial/temporal) information
        self.position_embeddings =tf.keras.layers.Embedding(
            input_dim=self.sequence_length, output_dim=self.output_dim
        )

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        embedded_tokens = self.token_embeddings(inputs)
        positions = tf.range(start=0, limit=length, delta=1) # delta: step size
        embedded_positions = self.position_embeddings(positions)
        # both embeddings are simply added together!
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return self.token_embeddings.compute_mask(inputs, mask=mask)

    def get_config(self): # retrieve config as a dict
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

def get_causal_attention_mask(inputs):
    print("Inputs:")
    print(inputs)
    print()
    input_shape = tf.shape(inputs)
    batch_size, sequence_length = input_shape[0], input_shape[1]
    i = tf.range(sequence_length)[:, tf.newaxis]
    j = tf.range(sequence_length)
    print(f"i:\n{i}")
    print()
    print(f"j:\n{j}")
    print()
    mask = tf.cast(i >= j, dtype="int32")
    print("Is i >= j? Boolean cast to ints. (Note the broadcasting)")
    print()
    print(mask)
    print()
    mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) # adding a batch dimension
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1),
         tf.constant([1, 1], dtype=tf.int32)], axis=0)
    print("We want mask to have the same dims as input, using `tf.tile`.")
    print("Creating the right multiplier for it:")
    print()
    print(mult)
    print()
    tile = tf.tile(mask, mult)
    print("Final mask with batch dimensions:")
    print()
    print(tile)
    return tile

mask = get_causal_attention_mask(tf.random.uniform(shape=(2,10), maxval=50, dtype=tf.int32))

@tf.keras.utils.register_keras_serializable("transformer_decoder")
class TransformerDecoder(tf.keras.layers.Layer):

    # simplified class: we don't need two attention layers as we don't have data
    # flowing from an encoder!

    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim                              # parameters
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.supports_masking = True                            # MASK: enforcing causality

    # new in Keras 3, see: https://keras.io/guides/making_new_layers_and_models_via_subclassing/#best-practice-deferring-weight-creation-until-the-shape-of-the-inputs-is-known
    def build(self, input_shape):
        self.attention_1 = tf.keras.layers.MultiHeadAttention(  # multi-head attention
            num_heads=self.num_heads, key_dim=self.embed_dim
        )
        self.dense_proj = tf.keras.Sequential(                  # dense layer on top: like a nonlinearity
            [tf.keras.layers.Dense(self.dense_dim, activation="relu"),
             tf.keras.layers.Dense(self.embed_dim),
             tf.keras.layers.Dropout(0.1)]
        )
        self.layernorm_1 = tf.keras.layers.LayerNormalization() # layer norm
        self.layernorm_2 = tf.keras.layers.LayerNormalization()


    # retrieve config as a dict (necessary for custom Keras layers)
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, mask=None):

        # prepare the causal mask
        causal_mask = self.get_causal_attention_mask(inputs)

        # REGULAR MASKED ATTENTION
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask) # apply the causal mask

        # residual / layer norm
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)

        # dense net / nonlinearity layer norm /
        proj_output = self.layernorm_2(self.dense_proj(attention_output_1))

        # residual
        return attention_output_1 + proj_output

Inputs:
tf.Tensor(
[[27 24 27  9  7 24 20 23 30  6]
 [44  1 42 29  3 16 18  6 44 12]], shape=(2, 10), dtype=int32)

i:
[[0]
 [1]
 [2]
 [3]
 [4]
 [5]
 [6]
 [7]
 [8]
 [9]]

j:
[0 1 2 3 4 5 6 7 8 9]

Is i >= j? Boolean cast to ints. (Note the broadcasting)

tf.Tensor(
[[1 0 0 0 0 0 0 0 0 0]
 [1 1 0 0 0 0 0 0 0 0]
 [1 1 1 0 0 0 0 0 0 0]
 [1 1 1 1 0 0 0 0 0 0]
 [1 1 1 1 1 0 0 0 0 0]
 [1 1 1 1 1 1 0 0 0 0]
 [1 1 1 1 1 1 1 0 0 0]
 [1 1 1 1 1 1 1 1 0 0]
 [1 1 1 1 1 1 1 1 1 0]
 [1 1 1 1 1 1 1 1 1 1]], shape=(10, 10), dtype=int32)

We want mask to have the same dims as input, using `tf.tile`.
Creating the right multiplier for it:

tf.Tensor([2 1 1], shape=(3,), dtype=int32)

Final mask with batch dimensions:

tf.Tensor(
[[[1 0 0 0 0 0 0 0 0 0]
  [1 1 0 0 0 0 0 0 0 0]
  [1 1 1 0 0 0 0 0 0 0]
  [1 1 1 1 0 0 0 0 0 0]
  [1 1 1 1 1 0 0 0 0 0]
  [1 1 1 1 1 1 0 0 0 0]
  [1 1 1 1 1 1 1 0 0 0]
  [1 1 1 1 1 1 1 1 0 0]
  [1 1 1 1 1 1 1 1 1 0]
  [1 1 1 1 1 1 1 1 1 1]]

 [[1 0 0 0 0 0 0 0 0 0]
  [1 1 0 0 0 0

In [9]:
EMBED_DIM = 256
LATENT_DIM = 2048
NUM_HEADS = 2
NUM_LAYERS = 5
LEARNING_RATE = 0.001

def build_model(embed_dim, latent_dim, num_heads, num_layers):
    inputs = tf.keras.Input(shape=(None,), dtype="int64")
    x = PositionalEmbedding(SEQUENCE_LENGTH, VOCAB_SIZE, embed_dim)(inputs)
    for _ in range(num_layers):
        x = TransformerDecoder(embed_dim, latent_dim, num_heads)(inputs=x) # no encoder input!
    outputs =tf.keras.layers.Dense(VOCAB_SIZE, activation="softmax")(x)    # probability distribution over the vocab
    model = tf.keras.Model(inputs, outputs)
    model.compile(
        loss="sparse_categorical_crossentropy",
        optimizer=tf.keras.optimizers.RMSprop(LEARNING_RATE),
        metrics= ["accuracy"]
    )
    return model

model1 = build_model(EMBED_DIM, LATENT_DIM, NUM_HEADS, NUM_LAYERS)

In [25]:
def sample_next(predictions, temperature=1.0):
    predictions = np.asarray(predictions).astype("float64")
    predictions = np.log(predictions) / temperature                 # temperature reweighting
    exp_preds = np.exp(predictions)                                 # these two lines are actually
    predictions = exp_preds / np.sum(exp_preds)                     # a softmax
    probas = np.random.multinomial(1, predictions, 1)               # sampling using our probabilities
    return np.argmax(probas)

class TextGenerator(tf.keras.callbacks.Callback):
    def __init__(self,
                 prompt,                                            # initial context
                 generate_length,                                   # how many words to generate
                 seq_length,
                 temperatures=(1.,),                                # a range of different temperatures
                 print_every=50):
        self.prompt = prompt
        self.generate_length = generate_length
        self.seq_length = seq_length
        self.temperatures = temperatures
        self.print_every = print_every

    def on_epoch_end(self, epoch, logs=None):
        if epoch == 0 or (epoch + 1) % self.print_every == 0:
            print()
            print()
            print("EPOCH", epoch + 1)
            print()
            print("-" * 40)
            for temperature in self.temperatures:
                msg = f"temperature {temperature}"
                print(msg)
                print("-" * len(msg))
                sentence = self.prompt                                                      # start with our prompt
                for i in range(self.generate_length):
                    tokenized_sentence = text_vectorization([sentence])                     # encode the sentence & feed to the model
                    predictions = self.model(tokenized_sentence[:, - self.seq_length + 1:]) # which gives us predictions (crop to seq_len!)
                    next_token = sample_next(predictions[0, -1, :])                         # use these to sample (get the index)
                    sampled_token = TOKEN_INDEX[next_token]                                # use the index to pick the token
                    sentence += sampled_token                                               # add it to our sentence
                print(sentence)
                print()
            print("-" * 40)


In [None]:
EPOCHS = 50

model1.fit(
    lm_dataset,
    epochs=EPOCHS,
    steps_per_epoch=DATASET_LENGTH // (SEQUENCE_LENGTH + 1) // BATCH_SIZE,
    # callbacks=[text_gen_callback, ckpt_callback]
)

In [12]:
EPOCHS = 50

model1.fit(
    lm_dataset,
    epochs=EPOCHS,
    steps_per_epoch=DATASET_LENGTH // (SEQUENCE_LENGTH + 1) // BATCH_SIZE,
    # callbacks=[text_gen_callback, ckpt_callback]
)

Epoch 1/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 119ms/step - accuracy: 0.7642 - loss: 0.7135
Epoch 2/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 112ms/step - accuracy: 0.7605 - loss: 0.7238
Epoch 3/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 111ms/step - accuracy: 0.7627 - loss: 0.7185
Epoch 4/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 111ms/step - accuracy: 0.7646 - loss: 0.7112
Epoch 5/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 112ms/step - accuracy: 0.7673 - loss: 0.7020
Epoch 6/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 112ms/step - accuracy: 0.7697 - loss: 0.6969
Epoch 7/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 111ms/step - accuracy: 0.7720 - loss: 0.6875
Epoch 8/50
[1m205/205[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 111ms/step - accuracy: 0.7742 - loss: 0.6814
Epoch 9/50
[1m2

<keras.src.callbacks.history.History at 0x7eb7043b8610>

In [22]:
# model.save('hemingway.keras')
# load = tf.keras.models.load_model('../content/hemingway.keras')
# load.summary()

  saveable.load_own_variables(weights_store.get(inner_path))


In [14]:
def generate(sentence=" ", generate_length=100, temperature=1.):
  for i in range(generate_length):
      tokenized_sentence = text_vectorization([sentence])                       # encode the sentence & feed to the model
      predictions = model(tokenized_sentence[:, - SEQUENCE_LENGTH + 1:])        # which gives us predictions  (crop to seq_len!)
      next_token = sample_next(predictions[0, -1, :], temperature)              # use these to sample (get the index)
      sampled_token = TOKEN_INDEX[next_token]                                   # use the index to pick the token
      sentence += sampled_token
      if len(sentence.split()) == 17:
        break
  return sentence

def outputhaiku(sentence):
  nlwords = ['i','he', 'she', 'it', 'they', 'for', 'and', 'nor', 'but', 'or', 'yet', 'so', 'the']  # list of linking and transition words
  words = sentence.split()
  count = 0
  lineone = ""
  linetwo = ""
  linethree = ""
  while len(lineone.split()) != 5:
    if words[count] in nlwords and len(lineone.split()) > 2:
      break
    lineone += words[count] + " "
    count += 1
  print(lineone)
  while len(linetwo.split()) != 7:
    if words[count] in nlwords and len(linetwo.split()) >2:
      break
    linetwo += words[count] + " "
    count += 1
  print(linetwo)
  while len(linethree.split()) != 5:
    if words[count] in nlwords and len(linethree.split()) > 2:
      break
    linethree += words[count] + " "
    count += 1
  print(linethree)

In [23]:
sentence = generate(sentence="summer ", generate_length=100, temperature=0.5)
print(sentence)

summer eeeepàeejaeeeerçep1âep
nöeeer6îpàeeeeeeîjöepàeeeeeeeepöa
eparesüepeàp
öeeeeeereeàeeeeeeàerçeóacneóöe


### 2. Generating initial results

In [26]:
outputhaiku(sentence=sentence)

summer eeeepàeejaeeeerçep1âep nöeeer6îpàeeeeeeîjöepàeeeeeeeepöa eparesüepeàp öeeeeeereeàeeeeeeàerçeóacneóöe 


IndexError: list index out of range

### 3. Different approach

Implementing GloVe embeddings with a simple LSTM model

In [None]:
def load_glove

In [None]:
EMBED_DIM = 256
LATENT_DIM = 2048
NUM_HEADS = 2
NUM_LAYERS = 5
LEARNING_RATE = 0.001

def build_model(embed_dim, latent_dim, num_heads, num_layers):
    inputs = tf.keras.Input(shape=(None,), dtype="int64")
    x = PositionalEmbedding(SEQUENCE_LENGTH, VOCAB_SIZE, embed_dim)(inputs)
    for _ in range(num_layers):
        x = TransformerDecoder(embed_dim, latent_dim, num_heads)(inputs=x) # no encoder input!
    outputs =tf.keras.layers.Dense(VOCAB_SIZE, activation="softmax")(x)    # probability distribution over the vocab
    model = tf.keras.Model(inputs, outputs)
    model.compile(
        loss="sparse_categorical_crossentropy",
        optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
        metrics= ["accuracy"]
    )
    return model

model = build_model(EMBED_DIM, LATENT_DIM, NUM_HEADS, NUM_LAYERS)