In [1]:
!pip uninstall tensorflow

[0m

In [2]:
import numpy as np
print(np.__version__)

2.0.2


In [3]:
!pip install tensorflow

Collecting tensorflow
  Downloading tensorflow-2.19.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting astunparse>=1.6.0 (from tensorflow)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting flatbuffers>=24.3.25 (from tensorflow)
  Downloading flatbuffers-25.2.10-py2.py3-none-any.whl.metadata (875 bytes)
Collecting google-pasta>=0.1.1 (from tensorflow)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow)
  Downloading libclang-18.1.1-py2.py3-none-manylinux2010_x86_64.whl.metadata (5.2 kB)
Collecting tensorboard~=2.19.0 (from tensorflow)
  Downloading tensorboard-2.19.0-py3-none-any.whl.metadata (1.8 kB)
Collecting tensorflow-io-gcs-filesystem>=0.23.1 (from tensorflow)
  Downloading tensorflow_io_gcs_filesystem-0.37.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Collecting wheel<1.0,>=0.23.0 (from astunparse>=1.6.0->tensorflow

In [4]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
import numpy as np
import os
import string
import random

In [5]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Creates a causal attention mask to ensure that each token in a sequence only attends to previous and current tokens,
    but not to future tokens. This is crucial for autoregressive models where each token should not be influenced
    by tokens that come after it in the sequence.

    The mask is designed to be applied to the attention weights in self-attention mechanisms, such as those used
    in Transformer models. It prevents information from flowing from future tokens to the current token, ensuring
    that predictions for each token depend only on tokens that precede it.

    Parameters:
    - batch_size (int): The number of sequences in the batch.
    - n_dest (int): The length of the destination sequence (number of tokens in the sequence being processed).
    - n_src (int): The length of the source sequence (typically equal to n_dest in self-attention).
    - dtype (tf.DType): The data type for the mask tensor (e.g., tf.float32, tf.int32).

    Returns:
    - tf.Tensor: A tensor of shape [batch_size, n_dest, n_src] where the upper triangle of the dot product matrix
      is masked out with zeros, and the lower triangle (including the diagonal) is filled with ones. This tensor
      can be used to mask the attention weights in a self-attention mechanism, ensuring that each token attends only
      to earlier tokens and itself, but not to future tokens.

    Example:
    >>> causal_mask = causal_attention_mask(2, 4, 4, tf.float32)
    >>> print(causal_mask)
    <tf.Tensor: shape=(2, 4, 4), dtype=float32, numpy=
    array([[[1., 0., 0., 0.],
            [1., 1., 0., 0.],
            [1., 1., 1., 0.],
            [1., 1., 1., 1.]],

           [[1., 0., 0., 0.],
            [1., 1., 0., 0.],
            [1., 1., 1., 0.],
            [1., 1., 1., 1.]]], dtype=float32)>
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)

In [6]:
class TransformerBlock(layers.Layer):
    """
    A single block of the Transformer model architecture. This block combines multi-head self-attention
    and feed-forward neural networks to process input sequences.

    The TransformerBlock is designed to capture complex dependencies in sequential data by using self-attention
    mechanisms. It also includes feed-forward layers to further process the attention outputs, along with normalization
    and dropout layers to stabilize training and prevent overfitting.

    Attributes:
    - embed_dim (int): The dimension of the embedding space.
    - num_heads (int): The number of attention heads in the multi-head attention mechanism.
    - ff_dim (int): The dimension of the feed-forward network hidden layer.
    - rate (float): The dropout rate applied to the attention and feed-forward layers (default is 0.1).

    Methods:
    - call(inputs): Executes the forward pass of the Transformer block. It applies the multi-head attention, adds
      residual connections, normalizes the outputs, and processes them through a feed-forward network.

    Parameters:
    - inputs (tf.Tensor): Input tensor with shape (batch_size, seq_len, embed_dim). Represents the sequence of embeddings.

    Returns:
    - tf.Tensor: Output tensor with shape (batch_size, seq_len, embed_dim). The processed sequence after attention,
      feed-forward operations, and normalization.

    Example:
    >>> transformer_block = TransformerBlock(embed_dim=64, num_heads=4, ff_dim=128)
    >>> inputs = tf.random.uniform((32, 10, 64))  # Example input tensor with batch_size=32, seq_len=10, embed_dim=64
    >>> output = transformer_block(inputs)
    >>> print(output.shape)
    (32, 10, 64)
    """
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super().__init__(**kwargs)  # Initializes the parent class (layers.Layer).
        # MultiHeadAttention layer to capture relationships between different positions in the sequence.
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)

        # Feed-forward network with a ReLU activation function followed by a linear layer.
        self.ffn = keras.Sequential(
            [
                layers.Dense(ff_dim, activation="relu"),  # Dense layer with ReLU activation.
                layers.Dense(embed_dim),  # Dense layer to project back to the embedding dimension.
            ]
        )

        # Layer normalization applied before and after the residual connection.
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)  # First layer normalization.

        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)  # Second layer normalization.

        # Dropout layers to prevent overfitting by randomly setting a fraction of input units to zero.
        self.dropout1 = layers.Dropout(rate)  # Dropout after the attention layer.

        self.dropout2 = layers.Dropout(rate)  # Dropout after the feed-forward network.

    def call(self, inputs):
        """
        Defines the forward pass of the Transformer block.

        Arguments:
        inputs -- The input tensor to the Transformer block.

        Returns:
        The output tensor of the Transformer block after applying attention, dropout, and feed-forward network.
        """
        input_shape = tf.shape(inputs)  # Get the shape of the input tensor.
        batch_size = input_shape[0]  # Number of sequences in the batch.
        seq_len = input_shape[1]  # Length of each sequence.

        # Create a causal attention mask to prevent attending to future tokens.
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)

        # Apply multi-head attention with the causal mask.
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)

        # Apply dropout to the attention output.
        attention_output = self.dropout1(attention_output)

        # Add the input (residual connection) to the attention output and normalize.
        out1 = self.layernorm1(inputs + attention_output)

        # Apply the feed-forward network to the normalized output.
        ffn_output = self.ffn(out1)

        # Apply dropout to the feed-forward network output.
        ffn_output = self.dropout2(ffn_output)

        # Add the normalized output of the feed-forward network to the residual connection and normalize.
        return self.layernorm2(out1 + ffn_output)


In [7]:
class TokenAndPositionEmbedding(layers.Layer):
    """
    A custom layer that combines token embeddings and positional embeddings for sequences.
    This layer is designed to convert input tokens into dense vectors and add positional information
    to each token embedding to capture the order of tokens in a sequence.

    The TokenAndPositionEmbedding layer is crucial for models that process sequential data, such as
    natural language processing models, where understanding the position of each token in the sequence
    is essential for interpreting the context and meaning.

    Attributes:
    - maxlen (int): The maximum length of the input sequences. This determines the size of the positional
      embeddings.
    - vocab_size (int): The size of the vocabulary, which determines the number of possible tokens.
    - embed_dim (int): The dimensionality of the embedding space. Each token and position is mapped to a vector of
      this size.

    Methods:
    - call(x): Applies the token and positional embeddings to the input sequences. It generates embeddings for each
      token and adds positional embeddings to these token embeddings to encode the order of tokens in the sequence.

    Parameters:
    - x (tf.Tensor): Input tensor of shape (batch_size, sequence_length), where each value represents a token index
      in the input sequences.

    Returns:
    - tf.Tensor: Output tensor of shape (batch_size, sequence_length, embed_dim), where each token index in the input
      sequences has been converted into an embedding vector, with positional information added to it.

    Example:
    >>> embedding_layer = TokenAndPositionEmbedding(maxlen=100, vocab_size=5000, embed_dim=64)
    >>> input_seq = tf.constant([[1, 5, 9], [2, 6, 3]])
    >>> output = embedding_layer(input_seq)
    >>> print(output.shape)
    (2, 3, 64)
    """
    def __init__(self, maxlen, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)  # Initializes the parent class (layers.Layer).

        # Embedding layer for tokens, maps token indices to dense vectors of size `embed_dim`.
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)

        # Embedding layer for positional encodings, maps position indices to dense vectors of size `embed_dim`.
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        """
        Applies token and positional embeddings to the input tensor.

        Arguments:
        x -- The input tensor containing token indices.

        Returns:
        The tensor after adding token and positional embeddings.
        """
        maxlen = tf.shape(x)[-1]  # Get the length of the sequences from the input tensor shape.

        # Generate position indices from 0 to maxlen - 1.
        positions = tf.range(start=0, limit=maxlen, delta=1)

        # Apply the positional embedding layer to position indices.
        positions = self.pos_emb(positions)

        # Apply the token embedding layer to the input tensor.
        x = self.token_emb(x)

        # Add the token embeddings and positional embeddings.
        return x + positions

In [8]:
batch_size = 128

# Path to the dataset file
file = "/content/dataset.txt"

# Check if the file exists
if os.path.exists(file):
    # Create a dataset from the text file
    text_ds = tf.data.TextLineDataset(file)
    text_ds = text_ds.shuffle(buffer_size=256)
    text_ds = text_ds.batch(batch_size)
else:
    # Raise a FileNotFoundError with a descriptive message
    raise FileNotFoundError(f"The file at {file} does not exist.")

In [9]:
def custom_standardization(input_string):
    """Remove html line-break tags and handle punctuation"""
    lowercased = tf.strings.lower(input_string)
    stripped_html = tf.strings.regex_replace(lowercased, "<br />", " ")
    return tf.strings.regex_replace(stripped_html, f"([{string.punctuation}])", r" \1")

In [10]:
vocab_size = 20000  # Only consider the top 20k words
maxlen = 80  # Max sequence size

# Create a vectorization layer and adapt it to the text
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
)

In [11]:
# Preparation of the dictionary/vocabulary
vectorize_layer.adapt(text_ds)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices
vocab[:30]

['',
 '[UNK]',
 np.str_('the'),
 np.str_('of'),
 np.str_('and'),
 np.str_('in'),
 np.str_(':'),
 np.str_('.'),
 np.str_('a'),
 np.str_('on'),
 np.str_('to'),
 np.str_('for'),
 np.str_('impact'),
 np.str_('sustainable'),
 np.str_('future'),
 np.str_('urban'),
 np.str_('energy'),
 np.str_('role'),
 np.str_('reducing'),
 np.str_('as'),
 np.str_('ai'),
 np.str_('fashion'),
 np.str_('environmental'),
 np.str_('intelligence'),
 np.str_('green'),
 np.str_('technology'),
 np.str_('education'),
 np.str_('eco'),
 np.str_('with'),
 np.str_('health')]

In [12]:
# Save the vocabulary to a text file
vocab_file_path = 'vocabulary.txt'
with open(vocab_file_path, 'w') as vocab_file:
    for word in vocab:
        vocab_file.write(f"{word}\n")

In [13]:
def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

In [14]:
text_ds = text_ds.map(prepare_lm_inputs_labels)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

In [15]:
# Hyperparameters
embed_dim = 256  # Embedding size for each token
num_heads = 8  # Number of attention heads (increased for better performance)
feed_forward_dim = 512  # Hidden layer size in feed forward network (increased for more capacity)
dropout_rate = 0.1  # Dropout rate to prevent overfitting

def create_model():
    """
    Creates and compiles a Transformer-based model for text generation.
    Returns:
        tf.keras.Model: Compiled Transformer model.
    """
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)

    # Embedding layer with positional encoding
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)

    # Transformer block with LayerNormalization and Dropout
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim, rate=dropout_rate)
    x = transformer_block(x)

    # Output layer
    outputs = layers.Dense(vocab_size)(x)

    # Create and compile the model
    model = keras.Model(inputs=inputs, outputs=[outputs, x])

    # Compile the model with sparse categorical crossentropy loss
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )

    return model

In [16]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")


# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "blockchain"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [None]:
model = create_model()
model.fit(text_ds, verbose=2, epochs=100, callbacks=[text_gen_callback])

Epoch 1/100




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 219ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 45ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5

In [None]:
class TextPredict():
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(self, model, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k
        self.model = model

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def generate(self, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")

In [None]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

num_tokens_generated = 20

def generateHeadling(start_prompt):
  start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
  text_predict = TextPredict(model, num_tokens_generated, start_tokens, vocab)
  text_predict.generate()

In [None]:
# Text generation
generateHeadling("the blockchain")

In [None]:
# Text generation
generateHeadling("intelligence")

In [None]:
# Text generation
generateHeadling("industry")

In [None]:
# Text generation
generateHeadling("cybercrime")

In [None]:
# Define the file paths for saving the model and weights
model_path = 'trained_model.model.h5'
weights_path = 'trained_model.weights.h5'

# Save the trained model architecture
model.save(model_path)

# Save the trained model weights
model.save_weights(weights_path)