In [None]:
import re
import json
import random
from datetime import datetime
from pathlib import Path
from collections import Counter

import numpy as np
from tqdm.notebook import trange
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.data import Dataset, AUTOTUNE
from tensorflow.keras.layers import TextVectorization, LSTM, Embedding, Dropout, Dense, LeakyReLU, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard

In [None]:
gpu_devices = tf.config.list_physical_devices('GPU')
tf.config.experimental.get_device_details(gpu_devices[0])

In [None]:
!nvidia-smi

In [None]:
RANDOM_STATE = 7
SEQ_LEN = 512
VAL_SIZE = 0.05
EPOCHS = 10
BATCH_SIZE = 128
LR = 1e-4
SHUFFLE_BUFFER = BATCH_SIZE * 10
EMBEDDING_DIM = 32
DROPOUT_RATIO = 0.2
TRAIN_STEPS = 2500
VAL_STEPS = 250
EARLY_STOP_PATIENCE = 3
TB_LOGS = Path("tb_logs/" + datetime.now().strftime("%Y%m%d-%H%M%S"))
TB_LOGS.mkdir(exist_ok = True, parents = True)

In [None]:
X = []
y = []

books = Path("../Data/Text/Sherlock_Holmes/").rglob("*.txt")

for book in books:
    with book.open('r', encoding = 'utf-8') as book_file:
        book_data = book_file.read()
        book_data = re.sub("[ ]+", " ", book_data)
        char_len = len(book_data)

        for i in range(0, char_len - SEQ_LEN):
            X.append(book_data[i : i + SEQ_LEN])
            y.append(book_data[i + SEQ_LEN])

for i in np.random.randint(0, len(X), 5):
    print(f'Input: {X[i]!r}')
    print(f'Output: {y[i]}\n')

In [None]:
len(X), len(y)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = VAL_SIZE, random_state = RANDOM_STATE)

len(X_train), len(y_train), len(X_test), len(y_test)

In [None]:
char_freq_dict = Counter(y_train)
char_freq_dict.most_common(20)

In [None]:
del(X)
del(y)
del(book_data)

In [None]:
%%time

vectorizer = TextVectorization(standardize = None, split = "character", name = 'TextVectorizer')
vocab_json = Path("vocab.json")

if vocab_json.exists():
    with vocab_json.open("r") as vocab_file:
        vocab = json.load(vocab_file)["vocab"]
    
    vectorizer.set_vocabulary(vocab)
else:
    vectorizer.adapt(X_train)
    vocab = vectorizer.get_vocabulary()[2:]

    with vocab_json.open("w") as vocab_file:
        json.dump({"vocab": vocab}, vocab_file)

vocab = vectorizer.get_vocabulary()
char_count = len(vocab)
char_count

In [None]:
class_weight_dict = {}
total_freq = sum([v for v in char_freq_dict.values()])

for i, v in enumerate(vocab):
    freq = char_freq_dict.get(v, 1)
    class_weight_dict[i] = round(np.sqrt(total_freq / (freq * char_count)), 2)

class_weight_dict

In [None]:
%%time

y_train = vectorizer(y_train).numpy().flatten()
y_test = vectorizer(y_test).numpy().flatten()
y_train.shape, y_test.shape

In [None]:
train_ds = Dataset.from_tensor_slices((X_train, y_train)).shuffle(SHUFFLE_BUFFER).batch(BATCH_SIZE).prefetch(AUTOTUNE)
train_ds, len(train_ds)

In [None]:
val_ds = Dataset.from_tensor_slices((X_test, y_test)).shuffle(SHUFFLE_BUFFER).batch(BATCH_SIZE).prefetch(AUTOTUNE)
val_ds, len(val_ds)

In [None]:
def get_lstm_model(char_count: int, embedding_dim: int = 32):
    input_layer = tf.keras.Input(shape = (1,), dtype = tf.string, name = 'Input')

    vectorizer_layer = vectorizer(input_layer)
    embedding_layer = Embedding(char_count + 1, embedding_dim, name = 'EmbeddingLayer')(vectorizer_layer)

    lstm_1 = LSTM(512, return_sequences = True, dropout = DROPOUT_RATIO, name = 'LSTM_1')(embedding_layer)
    lstm_2 = LSTM(256, dropout = DROPOUT_RATIO, name = 'LSTM_2')(lstm_1)
    bn_1 = BatchNormalization(name = 'BN_1')(lstm_2)

    dense_1 = Dense(256, name = 'Dense_1')(bn_1)
    lr_1 = LeakyReLU(name = 'LR_1')(dense_1)
    dropout_1 = Dropout(DROPOUT_RATIO, name = 'Dropout_1')(lr_1)

    dense_2 = Dense(128, name = 'Dense_2')(dropout_1)
    lr_2 = LeakyReLU(name = 'LR_2')(dense_2)
    dropout_2 = Dropout(DROPOUT_RATIO, name = 'Dropout_2')(lr_2)
    bn_2 = BatchNormalization(name = 'BN_2')(dropout_2)

    dense_3 = Dense(128, name = 'Dense_3')(bn_2)
    lr_3 = LeakyReLU(name = 'LR_3')(dense_3)

    output_layer = Dense(char_count, activation = 'softmax', name = "Output")(lr_3)

    model = tf.keras.Model(inputs  = input_layer, outputs = output_layer, name = 'Text_Generation_Model')
    model.compile(optimizer = Adam(LR), loss = 'sparse_categorical_crossentropy', metrics = ['sparse_categorical_accuracy'])
    return model

model = get_lstm_model(char_count, EMBEDDING_DIM)
model.summary()

In [None]:
%%time

earlystop = EarlyStopping(patience = EARLY_STOP_PATIENCE, restore_best_weights = True)
tensorboard = TensorBoard(log_dir = str(TB_LOGS))

history = model.fit(
        train_ds,
        validation_data = val_ds,
        epochs = EPOCHS,
        steps_per_epoch = TRAIN_STEPS,
        validation_steps = VAL_STEPS,
        class_weight = class_weight_dict,
        callbacks = [earlystop, tensorboard]
    )

In [None]:
model.evaluate(val_ds.take(100))

In [None]:
%%time

sample_input = random.choice(X_train)
print(f"Input:\n{sample_input}")

pred_output = ''

for i in trange(100, desc = "Predicting chars", unit = " char"):
    pred = model.predict([sample_input], verbose = False)
    pred_char_id = pred.argmax()
    pred_char = vocab[pred_char_id]
    pred_output += pred_char
    sample_input = sample_input[1:] + pred_char

print(f"Output:\n{pred_output}")