In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.layers import TextVectorization
import numpy as np
import os
import re
import string
import random
import pandas as pd


In [None]:
tf.config.list_physical_devices('GPU')

<h1>Data Preperation</h1> 	

In [None]:
#Import data
# data = pd.read_csv("./Trump-Tweets/Donald-Tweets!.csv")
# data
df1 = pd.read_csv("./Trump-Tweets/realdonaldtrump.csv")
df2 = pd.read_csv("./Trump-Tweets/trumptweets.csv")
data = pd.concat([df1,df2])

In [None]:
data

In [None]:
#Get Tweets
tweet_text_raw = data['content']
# tweet_text_raw = data['Tweet_Text']
print(tweet_text_raw)

In [None]:
#Remove http links
tweet_text_no_https = tweet_text_raw.str.replace(r'http\S+', '')
#Remove links without http
tweet_text_no_https = tweet_text_no_https.str.replace(r'\s*[^ /]+/[^ /]+', '')
#Set all empty strings to nan
tweet_text_no_https.replace('', np.nan, inplace=True)
#drop nan rows
tweet_text_no_links = tweet_text_no_https.dropna()
#Remove space after @
tweet_text_better_mentions = tweet_text_no_links.str.replace(r'@\s+', '@')
#tweet_text_no_mentions = tweet_text_no_links.str.replace(r'@\w+', '')
#Remove space after #
tweet_text_better_hashtags = tweet_text_better_mentions.str.replace(r'#\s', '#')
#Set short tweets to NaN and drop them
tweets_long = tweet_text_better_hashtags.apply(lambda x: x if len(x) > 40 else np.NaN)
tweets_long = tweets_long.dropna()
#remove emojis
tweets_no_emojis = tweets_long.apply(lambda x: x.encode('ascii', 'ignore').decode('ascii'))
tweets_lower_case = tweets_no_emojis.str.lower()
#remove unwanted punctuation
tweet_text_no_unwanted_punctuation = tweets_lower_case.str.replace(r"[\"$%()*+,-/:;<>=\[\]\\_`{}\|~.]", '')

tweet_text_clean = tweet_text_no_unwanted_punctuation
#tweet_text_clean.to_csv('./Trump-Tweets/clean_data.csv')
tweet_text_clean[10]

In [None]:
#https://keras.io/examples/generative/text_generation_with_miniature_gpt/

In [None]:
def causal_attention_mask(batch_size, n_dest, n_src, dtype):
    """
    Mask the upper half of the dot product matrix in self attention.
    This prevents flow of information from future tokens to current token.
    1's in the lower triangle, counting from the lower right corner.
    """
    i = tf.range(n_dest)[:, None]
    j = tf.range(n_src)
    m = i >= j - n_src + n_dest
    mask = tf.cast(m, dtype)
    mask = tf.reshape(mask, [1, n_dest, n_src])
    mult = tf.concat(
        [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
    )
    return tf.tile(mask, mult)


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        causal_mask = causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        attention_output = self.att(inputs, inputs, attention_mask=causal_mask)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)


In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
#Average length of a tweet
max_length = max([len(tweet.split(' ')) for tweet in tweet_text_clean])
max_length

In [None]:
tweets = []
for tweet in tweet_text_clean:
    sequence = []
    for word in tweet.split(' '):
        tweets.append(word)
len(tweets)

In [None]:
vocab_size = 20000
maxlen = max_length  # Max sequence size
embed_dim = 256  # Embedding size for each token
num_heads = 2  # Number of attention heads
feed_forward_dim = 256  # Hidden layer size in feed forward network inside transformer


def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    return model

In [None]:
batch_size = 128

text_ds = tf.data.Dataset.from_tensor_slices(tweet_text_clean)
text_ds = text_ds.shuffle(buffer_size=256)
text_ds = text_ds.batch(batch_size)


In [None]:

# Create a vectorization layer and adapt it to the text
vectorize_layer = TextVectorization(
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
    standardize= None,#'lower_and_strip_punctuation',
    split='whitespace',
)

In [None]:
vectorize_layer.adapt(tweet_text_clean)
vocab = vectorize_layer.get_vocabulary()  # To get words back from token indices
len(vocab)

In [None]:
def prepare_lm_inputs_labels(text):
    """
    Shift word sequences by 1 position so that the target for position (i) is
    word at position (i+1). The model will use all words up till position (i)
    to predict the next word.
    """
    text = tf.expand_dims(text, -1)
    tokenized_sentences = vectorize_layer(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

In [None]:
text_ds = text_ds.map(prepare_lm_inputs_labels)
text_ds = text_ds.prefetch(tf.data.AUTOTUNE)

In [None]:
class TextGenerator(keras.callbacks.Callback):
    """A callback to generate text from a trained model.
    1. Feed some starting prompt to the model
    2. Predict probabilities for the next token
    3. Sample the next token and add it to the next input

    Arguments:
        max_tokens: Integer, the number of tokens to be generated after prompt.
        start_tokens: List of integers, the token indices for the starting prompt.
        index_to_word: List of strings, obtained from the TextVectorization layer.
        top_k: Integer, sample from the `top_k` token predictions.
        print_every: Integer, print after this many epochs.
    """

    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
       
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x,verbose=0)
            sample_token = self.sample_from(y[0][sample_index])
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")
        
    def generate_text(self, start_string_tokens) -> str:
        start_tokens = [_ for _ in start_string_tokens]
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x,verbose=0)
            sample_token = self.sample_from(y[0][sample_index])
            #get [UNK] -> vocab[1]
            if sample_token == 1:
                continue
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join([self.detokenize(_) for _ in start_tokens + tokens_generated])
        return txt

In [None]:
# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

In [None]:
start_prompt = "fake news"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 40
text_gen = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [None]:
model = create_model()
model.summary()

In [None]:
tf.config.list_physical_devices('GPU')

In [None]:
NUM_EPOCHS = 25
model.fit(text_ds, verbose=1, epochs=NUM_EPOCHS, callbacks=[text_gen])

In [None]:
start_prompt = "America is the"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
text_gen.generate_text(start_tokens)

In [None]:
# Save the entire model as a SavedModel.
# model.save('./saved_model/miniature_gpt_model_200_epochs_no_dropout')

In [None]:
# loading the saved model
# loaded_model = tf.keras.models.load_model('./MyModel_tf')