## `RNN for Text Generation`

* `Import Libraries`

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

np.random.seed(42)
tf.random.set_seed(42)

## `Stateless RNN`

* `Read dataset`

In [2]:
# reading the data => using shakespeare
shakespeare_url = 'https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt'

# take the name on your local pc and url
FILE_PATH = tf.keras.utils.get_file('shakespeare.txt', shakespeare_url)

with open(FILE_PATH) as f:
    texts = f.read()

Downloading data from https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
[1m1115394/1115394[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [4]:
print(texts[:250])  # show sampels of dataset

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.



In [5]:
''.join(sorted(set(texts.lower())))   # To show all unique chars in dataset

"\n !$&',-.3:;?abcdefghijklmnopqrstuvwxyz"

In [6]:
# Tokenizing using keras
tokenizer = tf.keras.preprocessing.text.Tokenizer(char_level=True) #######
tokenizer.fit_on_texts(texts)

In [7]:
# From text to sequence
tokenizer.texts_to_sequences(['Digital Egypt Pioneers'])

[[13, 6, 21, 6, 3, 5, 12, 1, 2, 21, 16, 23, 3, 1, 23, 6, 4, 10, 2, 2, 9, 8]]

In [8]:
# From sequence to text
tokenizer.sequences_to_texts([[13, 6, 21, 6, 3, 5, 12, 1, 2, 21, 16, 23, 3, 1, 23, 6, 4, 10, 2, 2, 9, 8]])

['d i g i t a l   e g y p t   p i o n e e r s']

In [9]:
# Number of unique characters
max_id = len(tokenizer.word_index)
print('max_chars =>', max_id)

# Total number of characters in dataset # tokens
dataset_size = tokenizer.document_count
print('dataset_size =>', dataset_size)

max_chars => 39
dataset_size => 1115394


In [10]:
# Vocab dictionary --> starts from 1 not 0
tokenizer.word_index

{' ': 1,
 'e': 2,
 't': 3,
 'o': 4,
 'a': 5,
 'i': 6,
 'h': 7,
 's': 8,
 'r': 9,
 'n': 10,
 '\n': 11,
 'l': 12,
 'd': 13,
 'u': 14,
 'm': 15,
 'y': 16,
 'w': 17,
 ',': 18,
 'c': 19,
 'f': 20,
 'g': 21,
 'b': 22,
 'p': 23,
 ':': 24,
 'k': 25,
 'v': 26,
 '.': 27,
 "'": 28,
 ';': 29,
 '?': 30,
 '!': 31,
 '-': 32,
 'j': 33,
 'q': 34,
 'x': 35,
 'z': 36,
 '3': 37,
 '&': 38,
 '$': 39}

In [11]:
# Encoding to number
encoded_texts = np.array(tokenizer.texts_to_sequences([texts])) - 1  # -1 as vocab indexing starts from 1
encoded_texts = encoded_texts.ravel() # one list
encoded_texts

array([19,  5,  8, ..., 20, 26, 10])

In [13]:
# Split dataset to train and test ## no shuffle
train_size = dataset_size * 90 // 100

# Using TensorFlow Dataset
train_set = tf.data.Dataset.from_tensor_slices(encoded_texts[:train_size])
test_set = tf.data.Dataset.from_tensor_slices(encoded_texts[train_size:])

In [14]:
# Use window of 100 char, and the next one to be the target; from [0-100] features & 101 is target; and shift by one and so on
# Then from [1-101] Features & 102 is target; then shift by one and so on
n_steps = 100

# target = input shifted 1 character ahead
window_length = n_steps + 1

# Window for train & test
train_set_nested = train_set.window(window_length, shift=1, drop_remainder=True)
test_set_nested = test_set.window(window_length, shift=1, drop_remainder=True)

# show some samples
for val in train_set_nested.batch(1).take(2):
    print(val)  # it is nested, we must flatten it

<tensorflow.python.data.ops.dataset_ops._NestedVariant object at 0x78a3bf1181c0>
<tensorflow.python.data.ops.dataset_ops._NestedVariant object at 0x78a3bf118280>


In [15]:
# Flatten using flat_map function
# Imagine that {{1,2},{3,4,5,6}} --> the output will be {[1,2],[3,4],[5,6]}
train_set_flatten = train_set_nested.flat_map(lambda w: w.batch(window_length))
test_set_flatten = test_set_nested.flat_map(lambda w: w.batch(window_length))

# show some samples
for val in train_set_flatten.batch(4).take(2):
    print(val.numpy())
    print()

[[19  5  8  7  2  0 18  5  2  5 35  1  9 23 10 21  1 19  3  8  1  0 16  1
   0 22  8  3 18  1  1 12  0  4  9 15  0 19 13  8  2  6  1  8 17  0  6  1
   4  8  0 14  1  0  7 22  1  4 24 26 10 10  4 11 11 23 10  7 22  1  4 24
  17  0  7 22  1  4 24 26 10 10 19  5  8  7  2  0 18  5  2  5 35  1  9 23
  10 15  3 13  0]
 [ 5  8  7  2  0 18  5  2  5 35  1  9 23 10 21  1 19  3  8  1  0 16  1  0
  22  8  3 18  1  1 12  0  4  9 15  0 19 13  8  2  6  1  8 17  0  6  1  4
   8  0 14  1  0  7 22  1  4 24 26 10 10  4 11 11 23 10  7 22  1  4 24 17
   0  7 22  1  4 24 26 10 10 19  5  8  7  2  0 18  5  2  5 35  1  9 23 10
  15  3 13  0  4]
 [ 8  7  2  0 18  5  2  5 35  1  9 23 10 21  1 19  3  8  1  0 16  1  0 22
   8  3 18  1  1 12  0  4  9 15  0 19 13  8  2  6  1  8 17  0  6  1  4  8
   0 14  1  0  7 22  1  4 24 26 10 10  4 11 11 23 10  7 22  1  4 24 17  0
   7 22  1  4 24 26 10 10 19  5  8  7  2  0 18  5  2  5 35  1  9 23 10 15
   3 13  0  4  8]
 [ 7  2  0 18  5  2  5 35  1  9 23 10 21  1 19  3  8  1  0

In [16]:
# Shuffle and specify batch size
batch_size = 32

np.random.seed(42)
tf.random.set_seed(42)

# For Train
train_set_shuffled = train_set_flatten.shuffle(buffer_size=1000).batch(batch_size)
train_set_splitted = train_set_shuffled.map(lambda w: (w[:, :-1], w[:, 1:]))

# For Test
test_set_shuffled = test_set_flatten.shuffle(buffer_size=1000).batch(batch_size)
test_set_splitted = test_set_shuffled.map(lambda w: (w[:, :-1], w[:, 1:]))

# empty array to append in them
features = np.zeros((batch_size, n_steps))
targets = np.zeros((batch_size, n_steps))

for feat, target in train_set_splitted.batch(1).take(1):
    features = feat.numpy()
    targets = target.numpy()

In [18]:
features.shape

(1, 32, 100)

In [19]:
features[0, 0, :]  # first row in features

array([ 2,  3,  0,  2,  6,  1,  0, 22,  1,  3, 22, 11,  1, 26, 10, 10,  4,
       11, 11, 23, 10, 16,  1,  0, 24,  9,  3, 16, 27,  2, 17,  0, 16,  1,
        0, 24,  9,  3, 16, 27,  2, 26, 10, 10, 19,  5,  8,  7,  2,  0, 18,
        5,  2,  5, 35,  1,  9, 23, 10, 11,  1,  2,  0, 13,  7,  0, 24,  5,
       11, 11,  0,  6,  5, 14, 17,  0,  4,  9, 12,  0, 16,  1, 27, 11, 11,
        0,  6,  4, 25,  1,  0, 18,  3,  8,  9,  0,  4,  2,  0,  3])

In [20]:
targets[0, 0, :]  # first row in targets

array([ 3,  0,  2,  6,  1,  0, 22,  1,  3, 22, 11,  1, 26, 10, 10,  4, 11,
       11, 23, 10, 16,  1,  0, 24,  9,  3, 16, 27,  2, 17,  0, 16,  1,  0,
       24,  9,  3, 16, 27,  2, 26, 10, 10, 19,  5,  8,  7,  2,  0, 18,  5,
        2,  5, 35,  1,  9, 23, 10, 11,  1,  2,  0, 13,  7,  0, 24,  5, 11,
       11,  0,  6,  5, 14, 17,  0,  4,  9, 12,  0, 16,  1, 27, 11, 11,  0,
        6,  4, 25,  1,  0, 18,  3,  8,  9,  0,  4,  2,  0,  3, 13])

In [21]:
# I will use OHE

# For train & test
train_set_encoded = test_set_splitted.map(lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
test_set_encoded = test_set_splitted.map(lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))

# Show some examples
for X_batch, Y_batch in train_set_encoded.take(1):
    print(X_batch.shape, Y_batch.shape)

(32, 100, 39) (32, 100)


In [22]:
# Prefetch
train_set_encoded = train_set_encoded.prefetch(1)
test_set_encoded = test_set_encoded.prefetch(1)

* `Model`

In [23]:
model = tf.keras.models.Sequential([
    tf.keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id], dropout=0.2),
    tf.keras.layers.GRU(128, return_sequences=True, dropout=0.2),

    # Use TimeDistributed # target each window # must passe from 0 to dense so we shifted -1
    tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(max_id, activation='softmax'))
])

model.summary()

  super().__init__(**kwargs)


In [24]:
# Compile & fit
model.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              metrics=['accuracy'])


history = model.fit(train_set_encoded, epochs=2)

Epoch 1/2
[1m3483/3483[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1136s[0m 325ms/step - accuracy: 0.5679 - loss: 1.4804
Epoch 2/2


  self.gen.throw(typ, value, traceback)


[1m3483/3483[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1159s[0m 324ms/step - accuracy: 0.5421 - loss: 1.5339


In [25]:
model.save('model_char_stateless.h5')



In [26]:
model.evaluate(test_set_encoded)

[1m3483/3483[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m252s[0m 72ms/step - accuracy: 0.3708 - loss: 2.4219


[2.30216646194458, 0.39316728711128235]

In [27]:
# define a function to process the new data
def preprocess(texts):
    x = np.array(tokenizer.texts_to_sequences(texts)) - 1
    x = tf.one_hot(x, depth=max_id)
    return x


# Call the function
X_new = preprocess(['What is your nam'])
Y_pred = np.argmax(model.predict(X_new), axis=-1)

# Get the text from sequence
tokenizer.sequences_to_texts(Y_pred + 1)[0][-1]   # prediction; 1st sentence, last char

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 587ms/step


'r'

In [28]:
# Generating Fake Text
def next_char(text: str, temperature: float=1):
  # Preprocess
  X_new = preprocess([text])
  # Predict
  Y_pred = np.argmax(model.predict(X_new, verbose=0), axis=-1)
  # Get the text from sequence
  char = tokenizer.sequences_to_texts(Y_pred + 1)[0][-1]
  return char


def complete_char(text: str, n_char: int=100):
    ''' this function is to concatenate the predicted char with the whole text
    '''
    for i in range(n_char):
        text += next_char(text)
    return text

# Call the function
print(complete_char(text='t'))

thee, and yet so fast asleep.

antonio:
noble sebastian,
thou speak'st out of thy sleep. what is a st


-----

### `Statefull RNN`

#### `Stateless RNNs`:

* `Independent Batches`: Treats each batch of data as independent from the previous one. The hidden states are reset at the beginning of each batch.
* `Simpler to Implement`: Easier to manage since there is no need to carry over hidden states between batches.
* `Use Case`: Suitable for tasks where each input sequence is independent, such as processing individual sentences for sentiment analysis.

#### `Stateful RNNs`:
* `Persistent States`: Maintains hidden states across batches, allowing the model to retain information from previous batches.
* `Better for Sequential Data`: Useful for tasks requiring long-term dependencies and continuous sequences, like time series prediction or language modeling.
* `Complex Management`: Requires careful handling of states between batches and sequences to avoid incorrect state propagation.

![img.png](https://i1.wp.com/cdn-images-1.medium.com/max/940/0*oX2DCZ_9zMzNhTZ6.png?ssl=1&w=800&resize=800&ssl=1)

In [35]:
# Set seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# Dataset parameters
dataset_size = len(encoded_texts)
train_size = dataset_size * 90 // 100

# Create TensorFlow Datasets
train_set = tf.data.Dataset.from_tensor_slices(encoded_texts[:train_size])
test_set = tf.data.Dataset.from_tensor_slices(encoded_texts[train_size:])

n_steps = 100
window_length = n_steps + 1

# Create windows for train & test sets
train_set_nested = train_set.window(window_length, shift=1, drop_remainder=True)
test_set_nested = test_set.window(window_length, shift=1, drop_remainder=True)

# Flatten the windows using flat_map
train_set_flatten = train_set_nested.flat_map(lambda w: w.batch(window_length))
test_set_flatten = test_set_nested.flat_map(lambda w: w.batch(window_length))

# Batch with fixed batch size (e.g., batch=32)
batch_size = 32
train_set_flatten = train_set_flatten.batch(batch_size)
test_set_flatten = test_set_flatten.batch(batch_size)

# Split into input (X) and output (Y)
train_set_splitted = train_set_flatten.map(lambda w: (w[:, :-1], w[:, 1:]))
test_set_splitted = test_set_flatten.map(lambda w: (w[:, :-1], w[:, 1:]))

# Ensure X_batch is of type int for One-Hot Encoding
max_id = tf.reduce_max(encoded_texts) + 1  # Calculate the number of unique tokens

# Debug: Check max_id value
print(f"max_id: {max_id}")

# Apply One-Hot Encoding and ensure correct dtype
def encode_fn(X_batch, Y_batch):
    # Debug: Check X_batch data type and shape before encoding
    print(f"X_batch dtype: {X_batch.dtype}, X_batch shape: {X_batch.shape}")
    X_batch = tf.cast(X_batch, tf.int32)  # Cast to int32 for one_hot
    # Debug: Check Y_batch shape
    print(f"Y_batch shape: {Y_batch.shape}")

    # Reshape to ensure compatibility with one_hot
    X_batch_encoded = tf.one_hot(X_batch, depth=max_id)

    return X_batch_encoded, Y_batch

train_set_encoded = train_set_splitted.map(encode_fn)
test_set_encoded = test_set_splitted.map(encode_fn)

# Prefetch to improve performance
train_set_encoded = train_set_encoded.prefetch(1)
test_set_encoded = test_set_encoded.prefetch(1)

max_id: 39
X_batch dtype: <dtype: 'int64'>, X_batch shape: (None, None)
Y_batch shape: (None, None)


AssertionError: in user code:

    File "<ipython-input-35-702bdbb798ac>", line 48, in encode_fn  *
        X_batch_encoded = tf.one_hot(X_batch, depth=max_id)

    AssertionError: Unreachable


* `Model`

In [30]:
model = tf.keras.models.Sequential([
    tf.keras.layers.GRU(128, return_sequences=True, stateful=True, input_shape=[None, max_id], dropout=0.2),
    tf.keras.layers.GRU(128, return_sequences=True, stateful=True, dropout=0.2),

    # Use TimeDistributed
    tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(max_id, activation='softmax'))
])

model.summary()

ValueError: When using `stateful=True` in a RNN, the batch size must be static. Found dynamic batch size: sequence.shape=(None, None, 39)

In [31]:
# hidden states in passed from one batch to another in the same epoch
# but after the epoch is done we start the next epoch with zero hidden states and so on
class ResetStatesCallback(tf.keras.callbacks.Callback):  ## put it in the callbacks in fit
    def on_epoch_begin(self, epoch, logs):
        self.model.reset_states()

In [None]:
# Compile & fit
model.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(learning_rate=0.01),
              metrics=['accuracy'])


history = model.fit(train_set_encoded, epochs=2, callbacks=[ResetStatesCallback()])

In [None]:
model.save('model_char_stateull.h5')

In [None]:
model.evaluate(test_set_encoded)

In [None]:
# Call the function
print(complete_char(text='t'))

-----