In [1]:
import os
import random
import pickle
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [7]:
# Config / hyperparameters
DATA_CSV = "../../dataset/tinybot_augmented_dataset.csv"   # generated file
VOCAB_SIZE = 8000        # tokenizer size (cap) - change if needed
MAX_LEN = 20             # max tokens for input & output (pad/truncate)
EMBED_DIM = 128
HIDDEN_DIM = 192
BATCH_SIZE = 64
EPOCHS = 150
PATIENCE = 6
TFLITE_PATH = "tinybot_quantized.tflite"

In [5]:
# 1) Load CSV
df = pd.read_csv(DATA_CSV)

# handle various column names robustly
user_col = next((c for c in df.columns if c.lower() in ('input_text','input','prompt','query')), df.columns[0])
bot_col  = next((c for c in df.columns if c.lower() in ('reply_text','reply','response','output')), df.columns[1] if len(df.columns)>1 else df.columns[0])
print("Using columns:", user_col, ",", bot_col)

user_texts = df[user_col].astype(str).tolist()
bot_texts_raw = df[bot_col].astype(str).tolist()


# 2) Add start/end tokens and normalize

def normalize(s):
    s = s.strip()
    return s

user_texts = [normalize(t) for t in user_texts]
bot_texts = [f"start {normalize(t)} end" for t in bot_texts_raw]


# 3) Tokenize (single tokenizer for both)

tokenizer = keras.preprocessing.text.Tokenizer(num_words=VOCAB_SIZE, oov_token='UNK', filters="")
tokenizer.fit_on_texts(user_texts + bot_texts)
word_index = tokenizer.word_index
index_word = {v:k for k,v in word_index.items()}
vocab_size = min(VOCAB_SIZE, len(word_index) + 1)
print("Tokenizer vocab size:", len(word_index), "using vocab cap:", vocab_size)

# sequences
input_seq = tokenizer.texts_to_sequences(user_texts)
target_seq = tokenizer.texts_to_sequences(bot_texts)

# pad/truncate
input_seq = pad_sequences(input_seq, maxlen=MAX_LEN, padding='post')
target_seq = pad_sequences(target_seq, maxlen=MAX_LEN, padding='post')

# Discard examples where target has only 1 token (should be >= start + end)
valid_idx = [i for i in range(len(target_seq)) if np.count_nonzero(target_seq[i]) > 1]
input_seq = input_seq[valid_idx]
target_seq = target_seq[valid_idx]
print("Examples after filter:", input_seq.shape[0])

# teacher forcing slices
decoder_input = target_seq[:, :-1]    # all but last
decoder_target = target_seq[:, 1:]    # all but first
decoder_target = np.expand_dims(decoder_target, -1)  # required for sparse loss

Using columns: input_text , reply_text
Tokenizer vocab size: 299 using vocab cap: 300
Examples after filter: 15300


In [6]:
# 4) Build model: Seq2Seq + Attention

# Encoder
enc_inputs = keras.Input(shape=(None,), name="encoder_inputs")
enc_emb = layers.Embedding(vocab_size, EMBED_DIM, mask_zero=True, name="enc_emb")(enc_inputs)
enc_outputs, enc_state = layers.GRU(HIDDEN_DIM, return_sequences=True, return_state=True, name="encoder_gru")(enc_emb)

# Decoder
dec_inputs = keras.Input(shape=(None,), name="decoder_inputs")
dec_emb_layer = layers.Embedding(vocab_size, EMBED_DIM, mask_zero=True, name="dec_emb")
dec_emb = dec_emb_layer(dec_inputs)

# Project encoder outputs to embedding dim (so Attention queries/values match)
enc_proj = layers.Dense(EMBED_DIM, name="enc_proj")(enc_outputs)

# Attention (Keras Attention expects same last dim for query/value)
attn = layers.Attention(name="attention_layer")
# combine decoder embeddings and context at each time step:
# We'll compute context for each decoder time step by using Attention(dec_emb, enc_proj)
context = attn([dec_emb, enc_proj])   # shape: (batch, dec_time, EMBED_DIM)
decoder_combined = layers.Concatenate(axis=-1, name="dec_concat")([dec_emb, context])

decoder_gru = layers.GRU(HIDDEN_DIM, return_sequences=True, return_state=True, name="decoder_gru")
dec_outputs, _ = decoder_gru(decoder_combined, initial_state=enc_state)
decoder_dense = layers.Dense(vocab_size, activation="softmax", name="decoder_dense")
decoder_outputs = decoder_dense(dec_outputs)

model = keras.Model([enc_inputs, dec_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 enc_emb (Embedding)            (None, None, 128)    38400       ['encoder_inputs[0][0]']         
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 encoder_gru (GRU)              [(None, None, 192),  185472      ['enc_emb[0][0]']                
                                 (None, 192)]                                                 

In [8]:
# Callbacks: checkpoints & early stopping

ckpt_path = "tiny_bot_attn.h5"
callbacks = [
    keras.callbacks.ModelCheckpoint(ckpt_path, save_best_only=True, monitor='val_loss'),
    keras.callbacks.EarlyStopping(monitor='val_loss', patience=PATIENCE, restore_best_weights=True)
]


# Train / fine-tune
history = model.fit(
    [input_seq, decoder_input],
    decoder_target,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    validation_split=0.1,
    callbacks=callbacks,
    verbose=1
)


# Save model + tokenizer
model.save("tinybot_attn_final.h5")
with open("tokenizer.pkl", "wb") as f:
    pickle.dump(tokenizer, f)
print("Saved: tinybot_attn_final.h5 and tokenizer.pkl")

Epoch 1/150
Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Saved: tinybot_attn_final.h5 and tokenizer.pkl


In [9]:
# 8) Inference helper (greedy generation)
reverse_word_index = {v:k for k,v in tokenizer.word_index.items()}

def generate_reply_greedy(input_text, max_len=18):
    s = normalize(input_text)
    seq_in = tokenizer.texts_to_sequences([s])
    seq_in = pad_sequences(seq_in, maxlen=MAX_LEN, padding='post')
    # initial decoder token = 'start' if present in tokenizer, else use most common substitute
    start_token = tokenizer.word_index.get("start", tokenizer.word_index.get("START", None))
    end_token = tokenizer.word_index.get("end", tokenizer.word_index.get("END", None))
    if start_token is None or end_token is None:
        # fallback: use index 1 as start if not found
        start_token = 1
    dec_seq = [start_token]
    for i in range(max_len):
        dec_input = pad_sequences([dec_seq], maxlen=MAX_LEN-1, padding='post')  # decoder_input length = MAX_LEN-1 used at training
        preds = model.predict([seq_in, dec_input], verbose=0)
        # take the probability distribution at current time step (= len(dec_seq)-1)
        tpos = min(len(dec_seq)-1, preds.shape[1]-1)
        next_id = int(np.argmax(preds[0, tpos]))
        dec_seq.append(next_id)
        if end_token is not None and next_id == end_token:
            break
    # convert to words, remove start/end
    words = [reverse_word_index.get(i, "") for i in dec_seq if i>0]
    # strip start/end tokens if present in text
    words = [w for w in words if w not in ("start","end","START","END")]
    return " ".join(words).strip()

# quick sanity test (use a few examples)
tests = ["Hi", "What can you do?", "Turn on the light", "It's too hot", "Is the fridge on?"]
for t in tests:
    print("User:", t, "-> Bot:", generate_reply_greedy(t))

User: Hi -> Bot: setting quiet mode and dimming lights 😴
User: What can you do? -> Bot: welcome home! turning on your favorite lights 🏠
User: Turn on the light -> Bot: got it! powering down the heater 💡
User: It's too hot -> Bot: switching on the lights 💡
User: Is the fridge on? -> Bot: got it! powering down the moment.


In [None]:
# 9) Convert to quantized TFLite (full integer, representative dataset)
# Create representative dataset generator for calibration
def rep_gen():
    for i in range(1000):
        idx = random.randint(0, input_seq.shape[0]-1)
        yield [np.array([input_seq[idx]], dtype=np.int32), np.array([decoder_input[idx]], dtype=np.int32)]

# Convert
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Set optimizations and supported ops
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = rep_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set input/output type to int8
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

try:
    tflite_model = converter.convert()
    with open(TFLITE_PATH, "wb") as f:
        f.write(tflite_model)
    print("Saved quantized TFLite:", TFLITE_PATH)
except Exception as e:
    print("TFLite conversion failed:", e)
    # If INT8 conversion fails on complex ops, try float16 minimal conversion:
    try:
        converter = tf.lite.TFLiteConverter.from_keras_model(model)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
        tflite_model = converter.convert()
        with open("tinybot_float32.tflite","wb") as f:
            f.write(tflite_model)
        print("Saved fallback float tflite: tinybot_float32.tflite")
    except Exception as e2:
        print("Fallback tflite conversion also failed:", e2)