In [None]:
# import data analysis libraries
import pandas as pd, numpy as np,random

In [None]:
# mount drive in collab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# load data
data = pd.read_csv('drive/MyDrive/Portfolio resources/Sentiment analysis & explication dataset/reviews_for_text_generation.csv',index_col=0)
data.dropna(inplace=True)
data = data.sample(frac=1)
data.head()

Unnamed: 0,text,overall
17593,feeling like ish!,0
179614,"Low Quality [Control], I have purchased two of...",0
250120,"Good ""store bought"" cookies, I really like the...",1
488914,"Disgusting aftertaste!, I tried this yesterday...",0
857427,"Never actually did anything, I bought it, it c...",0


In [None]:
len(data)

180000

In [None]:
# text preprocessing step-1 (creating custom standardization function)
import tensorflow as tf
from keras.layers import TextVectorization
import string
stg = string.punctuation.replace("'",'')
def custom_standardization(input_string):
    lowercased = tf.strings.lower(input_string)
    stripped_html = tf.strings.regex_replace(lowercased, "\n", " ")
    return tf.strings.regex_replace(stripped_html, f"([{stg}])", r"")

In [None]:
# text preprocessing step-2 (implementation)
vocab_size = 25000
maxlen = 100
vectorize_layer = TextVectorization(
    standardize=custom_standardization,
    max_tokens=vocab_size - 1,
    output_mode="int",
    output_sequence_length=maxlen + 1,
)
vectorize_layer.adapt(data['text'].values)

In [None]:
# saving the vectorization weights
import pickle
pickle.dump({'config': vectorize_layer.get_config(),
             'weights': vectorize_layer.get_weights(), }
            , open("sentence_vectorizer_weights", "wb"))

In [None]:
# function for restructure the input-review & label-explication sentences
def prepare_lm_inputs_labels(text):
    #text = tf.expand_dims(text, -1)
    tokenized_sentences = new_v(text)
    x = tokenized_sentences[:, :-1]
    y = tokenized_sentences[:, 1:]
    return x, y

In [None]:
# function to create dataset
def create_Ds(x):
    batch_size=64
    text_ds = tf.data.Dataset.from_tensor_slices(x)
    text_ds = text_ds.shuffle(buffer_size=256)
    text_ds = text_ds.batch(batch_size)
    text_ds = text_ds.map(prepare_lm_inputs_labels)
    text_ds = text_ds.prefetch(tf.data.AUTOTUNE)
    return text_ds

In [None]:
# create training & validation datasets
val_ds = create_Ds(data['text'][:18000])
train_ds = create_Ds(data['text'][18000:])

In [None]:
train_ds

<PrefetchDataset element_spec=(TensorSpec(shape=(None, 100), dtype=tf.int64, name=None), TensorSpec(shape=(None, 100), dtype=tf.int64, name=None))>

In [None]:
# building model step-1 (build tokenizer)
import keras
from keras import layers
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim,**kwargs):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [None]:
# building model step-2 (build transformer block)
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1,**kwargs):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size = input_shape[0]
        seq_len = input_shape[1]
        attention_output = self.att(inputs, inputs, use_causal_mask = True)
        attention_output = self.dropout1(attention_output)
        out1 = self.layernorm1(inputs + attention_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

In [None]:
# building model step-3 (compile model)

embed_dim = 256
num_heads = 5
feed_forward_dim = 32


def create_model():
    inputs = layers.Input(shape=(maxlen,), dtype=tf.int32)
    embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
    x = embedding_layer(inputs)
    transformer_block = TransformerBlock(embed_dim, num_heads, feed_forward_dim)
    x = transformer_block(x)
    outputs = layers.Dense(vocab_size)(x)
    model = keras.Model(inputs=inputs, outputs=[outputs, x])
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(
        "adam", loss=[loss_fn, None],
    )  # No loss and optimization based on word embeddings from transformer block
    return model

In [None]:
# build caallback output generator for each training step
vocab = new_v.get_vocabulary()
class TextGenerator(keras.callbacks.Callback):
    def __init__(
        self, max_tokens, start_tokens, index_to_word, top_k=10, print_every=1
    ):
        self.max_tokens = max_tokens
        self.start_tokens = start_tokens
        self.index_to_word = index_to_word
        self.print_every = print_every
        self.k = top_k

    def sample_from(self, logits):
        logits, indices = tf.math.top_k(logits, k=self.k, sorted=True)
        indices = np.asarray(indices).astype("int32")
        preds = keras.activations.softmax(tf.expand_dims(logits, 0))[0]
        preds = np.asarray(preds).astype("float32")
        return np.random.choice(indices, p=preds)

    def detokenize(self, number):
        return self.index_to_word[number]

    def on_epoch_end(self, epoch, logs=None):
        start_tokens = [_ for _ in self.start_tokens]
        if (epoch + 1) % self.print_every != 0:
            return
        num_tokens_generated = 0
        tokens_generated = []
        while num_tokens_generated <= self.max_tokens:
            pad_len = maxlen - len(start_tokens)
            sample_index = len(start_tokens) - 1
            if pad_len < 0:
                x = start_tokens[:maxlen]
                sample_index = maxlen - 1
            elif pad_len > 0:
                x = start_tokens + [0] * pad_len
            else:
                x = start_tokens
            x = np.array([x])
            y, _ = self.model.predict(x)
            sample_token = self.sample_from(y[0][sample_index])
            word_token = self.detokenize(sample_token)
            tokens_generated.append(sample_token)
            start_tokens.append(sample_token)
            num_tokens_generated = len(tokens_generated)
        txt = " ".join(
            [self.detokenize(_) for _ in self.start_tokens + tokens_generated]
        )
        print(f"generated text:\n{txt}\n")


# Tokenize starting prompt
word_to_index = {}
for index, word in enumerate(vocab):
    word_to_index[word] = index

start_prompt = "the product works fairly well"
start_tokens = [word_to_index.get(_, 1) for _ in start_prompt.split()]
num_tokens_generated = 30
text_gen_callback = TextGenerator(num_tokens_generated, start_tokens, vocab)

In [None]:
# import & instantialise "ModelCheckpoint" to save model at optimal los per training step
from keras.callbacks import ModelCheckpoint
filepath="drive/MyDrive/Collab Models/review_explication_model.hdf5"
checkpoint = ModelCheckpoint(filepath=filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='auto')

In [None]:
# train model
model = create_model()
model.fit(train_ds, verbose=2, epochs=30, validation_data=val_ds,callbacks=[text_gen_callback,checkpoint])

Epoch 1/30
