In [34]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Embedding, LayerNormalization, Dropout
from tensorflow.keras.models import Model
import numpy as np

# ----------------------------
# Positional Encoding
# ----------------------------
def positional_encoding(position, d_model):
    angle_rads = np.arange(position)[:, np.newaxis] / np.power(
        10000, (2 * (np.arange(d_model) // 2)) / np.float32(d_model)
    )
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

# ----------------------------
# Multi-head Attention
# ----------------------------
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        assert d_model % num_heads == 0
        self.depth = d_model // num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def scaled_dot_product_attention(self, q, k, v, mask):
        matmul_qk = tf.matmul(q, k, transpose_b=True)
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_logits = matmul_qk / tf.math.sqrt(dk)
        if mask is not None:
            scaled_logits += (mask * -1e9)
        attention_weights = tf.nn.softmax(scaled_logits, axis=-1)
        output = tf.matmul(attention_weights, v)
        return output, attention_weights

    def call(self, v, k, q, mask=None):
        batch_size = tf.shape(q)[0]
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)
        scaled_attention, _ = self.scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        return self.dense(concat_attention)

# ----------------------------
# Feed Forward
# ----------------------------
class PositionwiseFeedforward(tf.keras.layers.Layer):
    def __init__(self, d_model, dff):
        super().__init__()
        self.dense1 = Dense(dff, activation='relu')
        self.dense2 = Dense(d_model)

    def call(self, x):
        return self.dense2(self.dense1(x))

# ----------------------------
# Transformer Block
# ----------------------------
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout_rate=0.1):
        super().__init__()
        self.att = MultiHeadAttention(d_model, num_heads)
        self.ffn = PositionwiseFeedforward(d_model, dff)
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout_rate)
        self.dropout2 = Dropout(dropout_rate)

    def call(self, x, training=False, mask=None):
        attn_output = self.att(x, x, x, mask=mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# ----------------------------
# Encoder
# ----------------------------
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size,
                 maximum_position_encoding, dropout_rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.embedding = Embedding(vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.enc_layers = [TransformerBlock(d_model, num_heads, dff, dropout_rate)
                           for _ in range(num_layers)]

    def call(self, x, training=False, mask=None):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x) * tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for layer in self.enc_layers:
            x = layer(x, training=training, mask=mask)
        return x

# ----------------------------
# Decoder
# ----------------------------
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, vocab_size,
                 maximum_position_encoding, dropout_rate=0.1):
        super().__init__()
        self.d_model = d_model
        self.embedding = Embedding(vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
        self.dropout = Dropout(dropout_rate)
        self.dec_layers = [TransformerBlock(d_model, num_heads, dff, dropout_rate)
                           for _ in range(num_layers)]

    def call(self, x, enc_output, training=False, look_ahead_mask=None):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x) * tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for layer in self.dec_layers:
            x = layer(x, training=training, mask=look_ahead_mask)
        return x

# ----------------------------
# Transformer Model
# ----------------------------
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff,
                 input_vocab_size, target_vocab_size, maximum_position_encoding,
                 dropout_rate=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                               input_vocab_size, maximum_position_encoding, dropout_rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               target_vocab_size, maximum_position_encoding, dropout_rate)
        self.final_layer = Dense(target_vocab_size)

    def call(self, inputs, training=False, look_ahead_mask=None, padding_mask=None):
        inp, tar = inputs
        enc_output = self.encoder(inp, training=training, mask=padding_mask)
        dec_output = self.decoder(tar, enc_output, training=training,
                                  look_ahead_mask=look_ahead_mask)
        return self.final_layer(dec_output)

In [35]:
# ----------------------------
# Data Preparation
# ----------------------------
with open('./data/shakespere_dataset.txt', 'r', encoding='utf-8') as f:
    text = f.read()

vocab = sorted(set(text))
vocab_size = len(vocab)
char2idx = {u:i for i,u in enumerate(vocab)}
idx2char = np.array(vocab)

text_as_int = np.array([char2idx[c] for c in text])

seq_length = 50  # shorter sequence for testing
examples_per_epoch = len(text)//seq_length

char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)

def split_input_target(chunk):
    return chunk[:-1], chunk[1:]

dataset = sequences.map(split_input_target)
BATCH_SIZE = 64
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
dataset = dataset.map(lambda x, y: (tf.cast(x, tf.int32), tf.cast(y, tf.int32)))

# ----------------------------
# Mask
# ----------------------------
def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask


In [40]:
# ----------------------------
# Training
# ----------------------------
num_layers = 4
d_model = 128
dff = 512
num_heads = 4
dropout_rate = 0.1
input_vocab_size = vocab_size
target_vocab_size = vocab_size
maximum_position_encoding = 1000

transformer = Transformer(num_layers, d_model, num_heads, dff,
                          input_vocab_size, target_vocab_size, maximum_position_encoding,
                          dropout_rate)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = tf.keras.optimizers.Adam(1e-3)

@tf.function
def train_step(inp, tar):
    tar_inp = tar[:, :-1]
    tar_real = tar[:, 1:]
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar_inp)[1])
    with tf.GradientTape() as tape:
        predictions = transformer((inp, tar_inp), training=True,
                                  look_ahead_mask=look_ahead_mask)
        loss = loss_object(tar_real, predictions)
    gradients = tape.gradient(loss, transformer.trainable_variables)
    optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
    return loss

EPOCHS = 30
for epoch in range(EPOCHS):
    total_loss = 0
    for (batch, (inp, tar)) in enumerate(dataset):
        batch_loss = train_step(inp, tar)
        total_loss += batch_loss
    print(f'Epoch {epoch+1}, Loss: {total_loss/(batch+1):.4f}')

Epoch 1, Loss: 2.6636
Epoch 2, Loss: 1.9895
Epoch 3, Loss: 1.8139
Epoch 4, Loss: 1.7217
Epoch 5, Loss: 1.6621
Epoch 6, Loss: 1.6200
Epoch 7, Loss: 1.5871
Epoch 8, Loss: 1.5611
Epoch 9, Loss: 1.5413
Epoch 10, Loss: 1.5234
Epoch 11, Loss: 1.5089
Epoch 12, Loss: 1.4952
Epoch 13, Loss: 1.4834
Epoch 14, Loss: 1.4740
Epoch 15, Loss: 1.4651
Epoch 16, Loss: 1.4574
Epoch 17, Loss: 1.4502
Epoch 18, Loss: 1.4425
Epoch 19, Loss: 1.4371
Epoch 20, Loss: 1.4319
Epoch 21, Loss: 1.4260
Epoch 22, Loss: 1.4203
Epoch 23, Loss: 1.4161
Epoch 24, Loss: 1.4120
Epoch 25, Loss: 1.4083
Epoch 26, Loss: 1.4042
Epoch 27, Loss: 1.4010
Epoch 28, Loss: 1.3967
Epoch 29, Loss: 1.3944
Epoch 30, Loss: 1.3910


In [43]:
# ----------------------------
# Generate Text (Chatbot Response)
# ----------------------------
def generate_response(model, start_string, num_generate=200):
    # Convert input string to int tokens
    input_eval = [char2idx[s] for s in start_string if s in char2idx]
    input_eval = tf.expand_dims(input_eval, 0)

    # Decoder input starts with the same
    text_generated = []

    # Reset model states
    for i in range(num_generate):
        # Pass input and current decoder sequence
        predictions = model((input_eval, input_eval), training=False)

        # Get predictions for the last time step
        predictions = predictions[:, -1:, :]  # (batch, 1, vocab_size)
        predicted_id = tf.random.categorical(predictions[0], num_samples=1)[-1,0].numpy()

        # Append prediction
        text_generated.append(idx2char[predicted_id])

        # Update decoder input
        input_eval = tf.concat([input_eval, [[predicted_id]]], axis=-1)

    return start_string + ''.join(text_generated)


In [47]:
# ----------------------------
# Interactive Chat
# ----------------------------
def chat():
    print("=== Shakespeare Chatbot ===")
    print("Type 'quit' to exit.\n")
    while True:
        user_input = input("You: ")
        if user_input.lower() == "quit":
            break
        response = generate_response(transformer, user_input, num_generate=100)
        print("Bot:", response[len(user_input):])  # show only continuation

# ----------------------------
# Example Usage after Training
# ----------------------------
chat()

=== Shakespeare Chatbot ===
Type 'quit' to exit.



You:  hi baby


Bot: , why staide, but fine:
At wo when I afteringes, ly, me.
A jat y the med tith thernormeth, t ash, on


You:  what


Bot:  down angliants should none,
Thuncy bagot, lark h ast yede itashone ionck, aths tonde th me: at whin


You:  quit
