## Chatbot with Attention LSTM RNN

In [1]:
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle

# Load data
with open('./input/final_clean_data.json', 'r') as f:
    data = json.load(f)

START_TOKEN = 'STARTSEQ'
END_TOKEN = 'ENDSEQ'

input_texts = [item['input'] for item in data]
target_texts = [f'{START_TOKEN} {item["output"]} {END_TOKEN}' for item in data]

# Tokenize
tokenizer_input = Tokenizer(filters='', lower=False)
tokenizer_output = Tokenizer(filters='', lower=False)
tokenizer_input.fit_on_texts(input_texts)
tokenizer_output.fit_on_texts(target_texts)

input_sequences = tokenizer_input.texts_to_sequences(input_texts)
target_sequences = tokenizer_output.texts_to_sequences(target_texts)

# Get optimal max lengths
input_lengths = [len(seq) for seq in input_sequences]
target_lengths = [len(seq) for seq in target_sequences]

max_encoder_seq_length = int(np.percentile(input_lengths, 95))
max_decoder_seq_length = int(np.percentile(target_lengths, 95)) + 1  # +1 to accommodate END_TOKEN

# Pad sequences
encoder_input_data = pad_sequences(input_sequences, maxlen=max_encoder_seq_length, padding='post')
decoder_input_data = pad_sequences([seq[:-1] for seq in target_sequences], maxlen=max_decoder_seq_length - 1, padding='post')
decoder_target_data = pad_sequences([seq[1:] for seq in target_sequences], maxlen=max_decoder_seq_length - 1, padding='post')
decoder_target_data_one_hot = tf.keras.utils.to_categorical(decoder_target_data, num_classes=len(tokenizer_output.word_index) + 1).astype('float32')

# Save updated metadata
with open("chatbot_meta.json", "w") as f:
    json.dump({
        "max_encoder_seq_length": max_encoder_seq_length,
        "max_decoder_seq_length": max_decoder_seq_length,
        "latent_dim": 128,
        "input_vocab_size": len(tokenizer_input.word_index) + 1,
        "target_vocab_size": len(tokenizer_output.word_index) + 1
    }, f)


In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Dot, Activation, Concatenate
from tensorflow.keras.optimizers import RMSprop

latent_dim = 128
input_vocab_size = len(tokenizer_input.word_index) + 1
target_vocab_size = len(tokenizer_output.word_index) + 1

# Encoder
encoder_inputs = Input(shape=(None,), name="encoder_inputs")
embedding_dim = 100
embedding_matrix = np.zeros((input_vocab_size, embedding_dim))
encoder_embedding = Embedding(input_vocab_size, embedding_dim, trainable=True, name="encoder_embedding")(encoder_inputs)
encoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True, name="encoder_lstm")
encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)

# Decoder
decoder_inputs = Input(shape=(None,), name="decoder_inputs")
decoder_embedding = Embedding(target_vocab_size, latent_dim, name="decoder_embedding")(decoder_inputs)
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True, name="decoder_lstm")
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=[state_h, state_c])

# Attention
attention_dot = Dot(axes=[2, 2], name="attention_dot")([decoder_outputs, encoder_outputs])
attention_softmax = Activation('softmax', name="attention_softmax")(attention_dot)
attention_context = Dot(axes=[2, 1], name="attention_context")([attention_softmax, encoder_outputs])
decoder_combined_context = Concatenate(axis=-1, name="concat_context")([attention_context, decoder_outputs])

# Output
decoder_dense = Dense(target_vocab_size, activation='softmax', name="decoder_output")
decoder_outputs_final = decoder_dense(decoder_combined_context)

# Model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs_final)
model.compile(optimizer=RMSprop(learning_rate=0.0005), loss='categorical_crossentropy', metrics=['accuracy'])

# Create sample weights to ignore padding tokens in loss
sample_weights = (decoder_target_data != 0).astype('float32')

# Train
model.fit([encoder_input_data, decoder_input_data],
          decoder_target_data_one_hot,
          sample_weight=sample_weights,
          batch_size=64,
          epochs=800)

# Save model
model.save("chatbot_attention_model.keras")

# Save tokenizers
with open("tokenizer_input.pkl", "wb") as f:
    pickle.dump(tokenizer_input, f)
with open("tokenizer_output.pkl", "wb") as f:
    pickle.dump(tokenizer_output, f)

# Save metadata
with open("chatbot_meta.json", "w") as f:
    json.dump({
        "max_encoder_seq_length": max_encoder_seq_length,
        "max_decoder_seq_length": max_decoder_seq_length,
        "latent_dim": latent_dim,
        "input_vocab_size": input_vocab_size,
        "target_vocab_size": target_vocab_size
    }, f)


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Dot, Activation, Concatenate
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle
import json
import ipywidgets as widgets
from IPython.display import display, Markdown

# --- Load tokenizers and metadata ---
with open("tokenizer_input.pkl", "rb") as f:
    tokenizer_input = pickle.load(f)

with open("tokenizer_output.pkl", "rb") as f:
    tokenizer_output = pickle.load(f)

with open("chatbot_meta.json", "r") as f:
    meta = json.load(f)

max_encoder_seq_length = meta["max_encoder_seq_length"]
max_decoder_seq_length = meta["max_decoder_seq_length"]
latent_dim = meta["latent_dim"]

START_TOKEN = 'STARTSEQ'
END_TOKEN = 'ENDSEQ'

reverse_target_word_index = {v: k for k, v in tokenizer_output.word_index.items()}
target_word_index = tokenizer_output.word_index

# --- Load trained model ---
model = load_model("chatbot_attention_model.keras")

# --- Rebuild encoder model ---
encoder_inputs = model.input[0]
encoder_outputs, state_h_enc, state_c_enc = model.get_layer("encoder_lstm").output
encoder_model = Model(encoder_inputs, [encoder_outputs, state_h_enc, state_c_enc])

# --- Rebuild decoder model with attention ---
decoder_inputs = model.input[1]
decoder_state_input_h = Input(shape=(latent_dim,), name="decoder_state_input_h")
decoder_state_input_c = Input(shape=(latent_dim,), name="decoder_state_input_c")
decoder_hidden_state_input = Input(shape=(max_encoder_seq_length, latent_dim), name="decoder_hidden_state_input")

decoder_embedding = model.get_layer("decoder_embedding")(decoder_inputs)
decoder_lstm = model.get_layer("decoder_lstm")
decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
    decoder_embedding, initial_state=[decoder_state_input_h, decoder_state_input_c]
)

attention_dot = Dot(axes=[2, 2], name="attention_dot")([decoder_outputs, decoder_hidden_state_input])
attention_softmax = Activation('softmax', name="attention_softmax")(attention_dot)
attention_context = Dot(axes=[2, 1], name="attention_context")([attention_softmax, decoder_hidden_state_input])
decoder_combined_context = Concatenate(axis=-1, name="concat_context")([attention_context, decoder_outputs])

decoder_dense = model.get_layer("decoder_output")
decoder_outputs_final = decoder_dense(decoder_combined_context)

decoder_model = Model(
    [decoder_inputs, decoder_hidden_state_input, decoder_state_input_h, decoder_state_input_c],
    [decoder_outputs_final, state_h_dec, state_c_dec]
)

# --- Decode function ---
def decode_sequence(input_text):
    input_seq = tokenizer_input.texts_to_sequences([input_text])
    input_seq = pad_sequences(input_seq, maxlen=max_encoder_seq_length, padding='post')
    encoder_outputs, state_h, state_c = encoder_model.predict(input_seq)

    target_seq = np.array([[target_word_index[START_TOKEN]]])
    decoded_sentence = ""

    for _ in range(max_decoder_seq_length):
        output_tokens, h, c = decoder_model.predict([target_seq, encoder_outputs, state_h, state_c])
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_word = reverse_target_word_index.get(sampled_token_index, '')

        if sampled_word == END_TOKEN:
            break

        decoded_sentence += ' ' + sampled_word
        target_seq = np.array([[sampled_token_index]])
        state_h, state_c = h, c

    return decoded_sentence.strip()

# --- Jupyter chatbot interface ---
greeting_inputs = {"hi", "hello", "hey", "hi there", "hey there"}
greeting_response = "Hello! I'm your financial assistant. Ask me anything about finance terms."
exit_commands = {"bye", "exit", "quit", "goodbye"}

input_box = widgets.Text(placeholder="Type your question here...", layout=widgets.Layout(width='90%'))
output_area = widgets.Output()

def handle_message(widget):
    user_input = widget.value.strip()
    if not user_input:
        return
    input_box.value = ""

    with output_area:
        print(f"You: {user_input}")

        if user_input.lower() in exit_commands:
            print("FinancialBot: Goodbye! Stay financially smart!")
            input_box.disabled = True
            return

        if user_input.lower() in greeting_inputs:
            print(f"FinancialBot: {greeting_response}")
            return

        try:
            response = decode_sequence(user_input)
            if response:
                print("FinancialBot:", response)
            else:
                print("FinancialBot: I'm not sure I understood that. Can you rephrase?")
        except Exception as e:
            print("FinancialBot: Sorry, something went wrong.")
            print(f"[Debug]: {e}")

input_box.on_submit(handle_message)

display(Markdown("### FinancialBot Chat Interface"))
display(input_box)
display(output_area)

with output_area:
    print("FinancialBot: Hello! Ask me any finance question, or type 'exit' to quit.")


## Chatbot using Transformer Based Model

## Using google's flan-t5-base model
- Train on GPU - special conda environment

In [None]:
# make sure pyTorch is configured so we can use apple metal
import torch
print(torch.backends.mps.is_available())

True


In [None]:
import json
import torch
from transformers import (
    T5TokenizerFast,
    T5ForConditionalGeneration,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    DataCollatorForSeq2Seq,
)
from datasets import load_dataset, Dataset

# Detect Apple MPS GPU
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

# Load data
with open("./input/final_clean_data.json") as f:
    data = json.load(f)

# Clean and load dataset
clean_data = [ex for ex in data if ex["output"].strip()]
dataset = Dataset.from_list(clean_data)

# Load tokenizer/model
model_name = "google/flan-t5-base"
tokenizer = T5TokenizerFast.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name).to(device)

# Preprocessing
def preprocess(example):
    input_text = "question: " + example["input"]
    target_text = example["output"]
    model_inputs = tokenizer(
        input_text, max_length=64, padding="max_length", truncation=True
    )
    with tokenizer.as_target_tokenizer():
        target = tokenizer(
            target_text, max_length=64, padding="max_length", truncation=True
        )
    labels = target["input_ids"]
    labels = [(label if label != tokenizer.pad_token_id else -100) for label in labels]
    model_inputs["labels"] = labels
    return model_inputs

tokenized_dataset = dataset.map(preprocess, remove_columns=["input", "output"])
data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model)

# Training args
training_args = Seq2SeqTrainingArguments(
    output_dir="./flan_t5_mps_model",
    num_train_epochs=15,
    per_device_train_batch_size=4,
    save_steps=100,
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=10,
    report_to="none",
    predict_with_generate=True,
    fp16=False  # AMP not supported on MPS
)

trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

trainer.train()

# Save
model.save_pretrained("./flan_t5_mps_model")
tokenizer.save_pretrained("./flan_t5_mps_model")
print("Model and tokenizer saved to ./flan_t5_mps_model")

In [None]:
import torch
from transformers import T5TokenizerFast, T5ForConditionalGeneration
import ipywidgets as widgets
from IPython.display import display, Markdown

# Load model & tokenizer
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
model_path = "./flan_t5_mps_model"

tokenizer = T5TokenizerFast.from_pretrained(model_path, local_files_only=True)
model = T5ForConditionalGeneration.from_pretrained(model_path, local_files_only=True).to(device)

# Inference function
def generate_answer(question):
    prompt = "question: " + question
    input_ids = tokenizer(prompt, return_tensors="pt", truncation=True, padding=True).input_ids.to(device)
    outputs = model.generate(
        input_ids=input_ids,
        max_length=64,
        do_sample=True,
        top_k=50,
        top_p=0.95,
        temperature=0.9,
        repetition_penalty=1.5
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# UI
input_box = widgets.Text(placeholder="Ask a finance question...", layout=widgets.Layout(width='90%'))
output_area = widgets.Output()

def handle_submit(widget):
    query = widget.value.strip()
    input_box.value = ""
    if not query:
        return
    with output_area:
        print(f"You: {query}")
        try:
            response = generate_answer(query)
            print("Bot:", response)
        except Exception as e:
            print("Bot: Something went wrong.")
            print(f"[Error]: {e}")

input_box.on_submit(handle_submit)
display(Markdown("### 💬 FinancialBot (FLAN-T5 MPS)"))
display(input_box)
display(output_area)

with output_area:
    print("Bot: Ready! Ask me anything about finance.")


## ROGUE evaluation

In [None]:

import json
import torch
from transformers import T5TokenizerFast, T5ForConditionalGeneration

# Set device
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Load model and tokenizer
model_path = "./flan_t5_mps_model"
tokenizer = T5TokenizerFast.from_pretrained(model_path, local_files_only=True)
model = T5ForConditionalGeneration.from_pretrained(model_path, local_files_only=True).to(device)

# Load data
with open("./input/final_clean_data.json") as f:
    data = json.load(f)

# Evaluate on only 20 samples
subset = data[:20]
predictions = []
references = []

for example in subset:
    input_text = "question: " + example["input"]
    input_ids = tokenizer(input_text, return_tensors="pt", truncation=True, padding="max_length", max_length=64).input_ids.to(device)

    with torch.no_grad():
        output_ids = model.generate(input_ids, max_length=64, do_sample=False)
    prediction = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    
    predictions.append(prediction)
    references.append(example["output"])

# Basic ROUGE-L
def lcs(X, Y):
    m, n = len(X), len(Y)
    dp = [[0] * (n + 1) for _ in range(m + 1)]
    for i in range(m):
        for j in range(n):
            if X[i] == Y[j]:
                dp[i + 1][j + 1] = dp[i][j] + 1
            else:
                dp[i + 1][j + 1] = max(dp[i][j + 1], dp[i + 1][j])
    return dp[m][n]

def rouge_l(pred, ref):
    pred_tokens, ref_tokens = pred.split(), ref.split()
    lcs_len = lcs(pred_tokens, ref_tokens)
    if lcs_len == 0:
        return 0.0
    prec = lcs_len / len(pred_tokens)
    rec = lcs_len / len(ref_tokens)
    return (2 * prec * rec) / (prec + rec) if (prec + rec) else 0.0

rouge_scores = [rouge_l(p, r) for p, r in zip(predictions, references)]
avg_rouge_l = sum(rouge_scores) / len(rouge_scores)
avg_rouge_l


0.41238167885565946