In [15]:
import torch
import torch.nn.functional as F
import torch.nn as nn
from tokenizers import Tokenizer
import os
import time
import math
import random

## 1. Configuration

In [16]:
MODEL_PATH = 'empathetic-transformer-basic-best (2).pt'
TOKENIZER_PATH = 'wp_tokenizer.json'

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

MAX_LEN = 100
DEFAULT_STRATEGY = "topk"
DEFAULT_K = 15
DEFAULT_P = 0.92
DEFAULT_TEMPERATURE = 0.75

PAD_TOKEN = "<PAD>"
SOS_TOKEN = "<SOS>"
EOS_TOKEN = "<EOS>"
UNK_TOKEN = "<UNK>"

print(DEVICE)

cpu


## 2. Helper Functions

In [17]:
def text_to_sequence_tokenizer(text, tokenizer):
    """Encodes text to a list of token IDs, adding SOS/EOS."""
    encoded = tokenizer.encode(text)
    return [SOS_IDX] + encoded.ids + [EOS_IDX]

def sequence_to_text_tokenizer(sequence, tokenizer):
    """Decodes a list of token IDs back to text, removing special tokens."""
    ids = [idx for idx in sequence if idx not in [PAD_IDX, SOS_IDX, EOS_IDX]]
    return tokenizer.decode(ids)

## 3. Generation Function

In [18]:
def generate_response_sampling(
    sentence,
    model,
    tokenizer,
    device,
    max_len=MAX_LEN,
    strategy="topk",
    k=10,
    p=0.9,
    temperature=0.8
    ):
    """Generates a response using the transformer model with sampling."""
    assert strategy in ['greedy', 'topk', 'topp'], "Strategy must be 'greedy', 'topk', or 'topp'"
    assert temperature > 0, "Temperature must be positive"

    model.eval()

    tokens = text_to_sequence_tokenizer(sentence, tokenizer)

    if len(tokens) > max_len:
         print(f"Warning: Input sentence truncated to {max_len} tokens.")
         tokens = tokens[:max_len]

    src_tensor = torch.LongTensor(tokens).unsqueeze(0).to(device)
    src_mask = model.make_src_mask(src_tensor)

    with torch.no_grad():
        enc_src = model.encoder(src_tensor, src_mask)

    trg_indices = [SOS_IDX]

    for i in range(max_len - 1):
        trg_tensor = torch.LongTensor(trg_indices).unsqueeze(0).to(device)
        trg_mask = model.make_trg_mask(trg_tensor)

        with torch.no_grad():
            output, attention = model.decoder(trg_tensor, enc_src, trg_mask, src_mask)

        pred_token_logits = output[-1, 0, :]

        pred_token_logits = pred_token_logits / temperature

        pred_token_probs = F.softmax(pred_token_logits, dim=-1)

        next_token_id = -1
        if strategy == 'greedy':
            next_token_id = torch.argmax(pred_token_probs).item()
        elif strategy == 'topk':
            topk_probs, topk_indices = torch.topk(pred_token_probs, k=min(k, pred_token_probs.size(-1))) # Ensure k is not > vocab size
            mask = torch.zeros_like(pred_token_probs)
            mask.scatter_(0, topk_indices, 1.0)
            filtered_probs = pred_token_probs * mask
            sum_filtered_probs = torch.sum(filtered_probs)
            if sum_filtered_probs > 1e-9:
                 filtered_probs = filtered_probs / sum_filtered_probs
                 next_token_id = torch.multinomial(filtered_probs, num_samples=1).item()
            else:
                 print("Warning: Top-k resulted in zero probability sum. Using EOS.")
                 next_token_id = EOS_IDX
        elif strategy == 'topp':
            sorted_probs, sorted_indices = torch.sort(pred_token_probs, descending=True)
            cumulative_probs = torch.cumsum(sorted_probs, dim=-1)
            sorted_indices_to_remove = cumulative_probs > p
            sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].clone()
            sorted_indices_to_remove[0] = 0
            indices_to_remove = sorted_indices[sorted_indices_to_remove]
            pred_token_probs[indices_to_remove] = 0.0
            sum_filtered_probs = torch.sum(pred_token_probs)
            if sum_filtered_probs > 1e-9:
                 filtered_probs = pred_token_probs / sum_filtered_probs
                 next_token_id = torch.multinomial(filtered_probs, num_samples=1).item()
            else:
                 print("Warning: Top-p resulted in zero probability sum. Using most likely token.")
                 next_token_id = sorted_indices[0].item()

        trg_indices.append(next_token_id)

        if next_token_id == EOS_IDX:
            break

    trg_tokens_text = sequence_to_text_tokenizer(trg_indices, tokenizer)

    return trg_tokens_text

## 4. Load Tokenizer and Model

In [19]:

print(f"Loading tokenizer from {TOKENIZER_PATH}...")
tokenizer = Tokenizer.from_file(TOKENIZER_PATH)
print("Tokenizer loaded.")

INPUT_DIM = tokenizer.get_vocab_size()
OUTPUT_DIM = INPUT_DIM
print(f"Tokenizer Vocabulary Size: {INPUT_DIM}")

PAD_IDX = tokenizer.token_to_id(PAD_TOKEN)
SOS_IDX = tokenizer.token_to_id(SOS_TOKEN)
EOS_IDX = tokenizer.token_to_id(EOS_TOKEN)
UNK_IDX = tokenizer.token_to_id(UNK_TOKEN)

if None in [PAD_IDX, SOS_IDX, EOS_IDX, UNK_IDX]:
     print("Error: One or more special tokens not found in tokenizer vocab!")
else:
    print(f"Special Token IDs: PAD={PAD_IDX}, SOS={SOS_IDX}, EOS={EOS_IDX}, UNK={UNK_IDX}")

Loading tokenizer from wp_tokenizer.json...
Tokenizer loaded.
Tokenizer Vocabulary Size: 20000
Special Token IDs: PAD=0, SOS=1, EOS=2, UNK=3


In [20]:
DATASET_SUBSET_SIZE = None
MAX_LEN = 60
VOCAB_SIZE = 20000
TOKENIZER_FILE = "wp_tokenizer.json"

BATCH_SIZE = 32
LEARNING_RATE = 0.0005
N_EPOCHS = 50
CLIP = 1.0
PATIENCE = 10

D_MODEL = 256
N_HEADS = 8
ENC_LAYERS = 3
DEC_LAYERS = 3
PF_DIM = 512
DROPOUT = 0.3

SEED = 1234
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

PAD_TOKEN = "<PAD>"
SOS_TOKEN = "<SOS>"
EOS_TOKEN = "<EOS>"
UNK_TOKEN = "<UNK>"
SPECIAL_TOKENS = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN]

class PositionalEncoding(nn.Module):
    """Injects positional information into the input embeddings."""
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class MultiHeadAttention(nn.Module):
    """Multi-Head Attention mechanism."""
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"

        self.d_model = d_model
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads

        self.fc_q = nn.Linear(d_model, d_model)
        self.fc_k = nn.Linear(d_model, d_model)
        self.fc_v = nn.Linear(d_model, d_model)

        self.fc_o = nn.Linear(d_model, d_model)

        self.dropout = nn.Dropout(dropout)

        self.register_buffer("scale", torch.sqrt(torch.FloatTensor([self.head_dim])))


    def forward(self, query, key, value, mask=None):
        """
        Args:
            query: Tensor, shape [query_len, batch_size, d_model]
            key: Tensor, shape [key_len, batch_size, d_model]
            value: Tensor, shape [value_len, batch_size, d_model] (value_len == key_len)
            mask: Tensor, shape [batch_size, 1, query_len, key_len] or broadcastable.
                  Masks positions where attention should be zero (e.g., padding).
        """
        batch_size = query.shape[1]

        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        Q = Q.view(query.shape[0], batch_size, self.n_heads, self.head_dim).permute(1, 2, 0, 3)
        K = K.view(key.shape[0], batch_size, self.n_heads, self.head_dim).permute(1, 2, 0, 3)
        V = V.view(value.shape[0], batch_size, self.n_heads, self.head_dim).permute(1, 2, 0, 3)

        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale.to(Q.device)

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = torch.softmax(energy, dim=-1)

        attention = self.dropout(attention)

        x = torch.matmul(attention, V)

        x = x.permute(0, 2, 1, 3).contiguous()
        x = x.view(batch_size, query.shape[0], self.d_model)

        x = x.permute(1, 0, 2)

        x = self.fc_o(x)

        return x, attention

class PositionwiseFeedforward(nn.Module):
    """Position-wise Feedforward Network."""
    def __init__(self, d_model, pf_dim, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, pf_dim)
        self.fc2 = nn.Linear(pf_dim, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, d_model]
        """
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

class EncoderLayer(nn.Module):
    """A single layer of the Transformer Encoder."""
    def __init__(self, d_model, n_heads, pf_dim, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.ff = PositionwiseFeedforward(d_model, pf_dim, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        """
        Args:
            src: Tensor, shape [src_len, batch_size, d_model]
            src_mask: Tensor, mask for self-attention (hides padding)
        """
        _src, _ = self.self_attn(src, src, src, src_mask)
        src = self.norm1(src + self.dropout(_src))

        _src = self.ff(src)
        src = self.norm2(src + self.dropout(_src))

        return src

class DecoderLayer(nn.Module):
    """A single layer of the Transformer Decoder."""
    def __init__(self, d_model, n_heads, pf_dim, dropout=0.1):
        super().__init__()
        self.masked_self_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.encoder_attn = MultiHeadAttention(d_model, n_heads, dropout)
        self.ff = PositionwiseFeedforward(d_model, pf_dim, dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, trg, enc_src, trg_mask, src_mask):
        """
        Args:
            trg: Tensor, shape [trg_len, batch_size, d_model] (target sequence embeddings)
            enc_src: Tensor, shape [src_len, batch_size, d_model] (encoder output)
            trg_mask: Tensor, mask for self-attention (hides padding and future tokens)
            src_mask: Tensor, mask for encoder-attention (hides padding in source)
        """
        _trg, _ = self.masked_self_attn(trg, trg, trg, trg_mask)
        trg = self.norm1(trg + self.dropout(_trg))

        _trg, attention = self.encoder_attn(trg, enc_src, enc_src, src_mask)
        trg = self.norm2(trg + self.dropout(_trg))

        _trg = self.ff(trg)
        trg = self.norm3(trg + self.dropout(_trg))

        return trg, attention

class Encoder(nn.Module):
    """The Transformer Encoder stack."""
    def __init__(self, input_dim, d_model, n_layers, n_heads, pf_dim, dropout, max_len=MAX_LEN):
        super().__init__()
        self.tok_embedding = nn.Embedding(input_dim, d_model)
        self.pos_embedding = PositionalEncoding(d_model, dropout, max_len=5000 if max_len < 5000 else max_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_heads, pf_dim, dropout)
                                     for _ in range(n_layers)])
        self.dropout = nn.Dropout(dropout)
        self.register_buffer("scale", torch.sqrt(torch.FloatTensor([d_model])))

    def forward(self, src, src_mask):
        """
        Args:
            src: Tensor, shape [batch_size, src_len] (input token IDs)
            src_mask: Tensor, mask for padding in the source sequence
        """
        batch_size = src.shape[0]
        src_len = src.shape[1]

        scale = self.scale.to(src.device)
        src = self.dropout((self.tok_embedding(src) * scale))

        src = src.permute(1, 0, 2)

        src = self.pos_embedding(src)

        for layer in self.layers:
            src = layer(src, src_mask)

        return src

class Decoder(nn.Module):
    """The Transformer Decoder stack."""
    def __init__(self, output_dim, d_model, n_layers, n_heads, pf_dim, dropout, max_len=MAX_LEN):
        super().__init__()
        self.tok_embedding = nn.Embedding(output_dim, d_model)
        self.pos_embedding = PositionalEncoding(d_model, dropout, max_len=5000 if max_len < 5000 else max_len)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, pf_dim, dropout)
                                     for _ in range(n_layers)])
        self.fc_out = nn.Linear(d_model, output_dim)
        self.dropout = nn.Dropout(dropout)
        self.register_buffer("scale", torch.sqrt(torch.FloatTensor([d_model])))

    def forward(self, trg, enc_src, trg_mask, src_mask):
        """
        Args:
            trg: Tensor, shape [batch_size, trg_len] (target token IDs)
            enc_src: Tensor, shape [src_len, batch_size, d_model] (encoder output)
            trg_mask: Tensor, mask for target self-attention
            src_mask: Tensor, mask for encoder-decoder attention (source padding)
        """
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]

        scale = self.scale.to(trg.device)
        trg = self.dropout((self.tok_embedding(trg) * scale))

        trg = trg.permute(1, 0, 2)

        trg = self.pos_embedding(trg)

        for layer in self.layers:
            trg, attention = layer(trg, enc_src, trg_mask, src_mask)


        output = self.fc_out(trg)

        return output, attention

class Seq2SeqTransformer(nn.Module):
    """The main Seq2Seq Transformer model."""
    def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        """Creates a mask for the source sequence to ignore padding tokens."""
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        """Creates a mask for the target sequence to hide padding and future tokens."""
        trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)

        trg_len = trg.shape[1]
        trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device=self.device)).bool()

        trg_mask = trg_pad_mask & trg_sub_mask
        return trg_mask.to(self.device)

    def forward(self, src, trg):
        """
        Args:
            src: Tensor, shape [batch_size, src_len] (source token IDs)
            trg: Tensor, shape [batch_size, trg_len] (target token IDs)
        """
        src = src.to(self.device)
        trg = trg.to(self.device)

        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)

        enc_src = self.encoder(src, src_mask)

        output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)

        return output, attention


print("Initializing model components...")
enc = Encoder(INPUT_DIM, D_MODEL, ENC_LAYERS, N_HEADS, PF_DIM, DROPOUT, MAX_LEN).to(device)
dec = Decoder(OUTPUT_DIM, D_MODEL, DEC_LAYERS, N_HEADS, PF_DIM, DROPOUT, MAX_LEN).to(device)

model = Seq2SeqTransformer(enc, dec, PAD_IDX, PAD_IDX, device).to(device)


Using device: cpu
Initializing model components...


In [21]:
print(DEVICE)
DEVICE = torch.device('cpu')
if not os.path.exists(MODEL_PATH):
    print(f"Error: Model file not found at {MODEL_PATH}")
    raise FileNotFoundError(f"Model not found: {MODEL_PATH}")
else:
    try:
        print(f"Loading entire model object from {MODEL_PATH}...")
        model.load_state_dict(torch.load(MODEL_PATH, map_location=DEVICE))
        print("Model loaded successfully.")
        model.to(DEVICE)
        model.eval()
    except FileNotFoundError:
        # This case is handled above, but kept for robustness
        print(f"Error: Model file not found at {MODEL_PATH}")
        raise
    except Exception as e:
        print(f"Error loading model: {e}")
        print("Check model file integrity and Python environment compatibility.")
        raise e

cpu
Loading entire model object from empathetic-transformer-basic-best (2).pt...
Model loaded successfully.


In [26]:
test_prompts = [
    "I just got a promotion at work! I'm so happy.",
    "I feel really lonely these days. No one seems to understand me.",
    "My best friend just moved to another country, and I miss them so much.",
    "I failed my exam even after studying so hard. I feel so disappointed.",
    "I helped a stranger today, and it made me feel really good.",
    "I'm really nervous about my job interview tomorrow.",
    "I just finished a marathon! I feel so accomplished.",
    "My pet passed away last night, and I’m heartbroken.",
    "I got stuck in traffic for hours today. It was so frustrating!",
    "I found out I’m going to be a parent! I’m overjoyed but also a little scared.",
    "I’ve been feeling really unmotivated lately. I don’t know what to do.",
    "I had a great conversation with an old friend today. It felt amazing!",
    "I lost my wallet today. Now I have to replace everything.",
    "I just tried a new hobby, and I think I love it!",
    "Someone criticized my work today, and it made me feel insecure.",
    "I’m struggling with my mental health, and I don’t know how to talk about it.",
    "My birthday is coming up, but I don’t feel excited this year.",
    "I finally confronted someone who hurt me in the past. It was really hard.",
    "I got my dream job! I can’t believe it’s happening.",
    "I feel so stuck in life. I don’t know what my next step should be."
]

for prompt in test_prompts:
            response = generate_response_sampling(
                prompt, model, tokenizer, device, max_len=MAX_LEN,
                strategy="topp", p=0.9, temperature=0.8
            )
            print(f"User: {prompt}")
            print(f"Bot:  {response}")
            print("-" * 20)


User: I just got a promotion at work! I'm so happy.
Bot:  that is a lot of fun !
--------------------
User: I feel really lonely these days. No one seems to understand me.
Bot:  Yeah - I guess if I would go a good one and he can be used to get .
--------------------
User: My best friend just moved to another country, and I miss them so much.
Bot:  Oh no ! Did you talk to the wedding ?
--------------------
User: I failed my exam even after studying so hard. I feel so disappointed.
Bot:  That sounds really so sad . Do you do at least ?
--------------------
User: I helped a stranger today, and it made me feel really good.
Bot:  That must have been a school ##_comma_ and I bet you can get ?
--------------------
User: I'm really nervous about my job interview tomorrow.
Bot:  Why did you get ?
--------------------
User: I just finished a marathon! I feel so accomplished.
Bot:  How do you think ?
--------------------
User: My pet passed away last night, and I’m heartbroken.
Bot:  I know . How

## 5. Interactive Inference Loop

In [28]:
print("\n--- Empathetic Chatbot Ready ---")
print(f"(Device: {DEVICE}, Strategy: {DEFAULT_STRATEGY}, Temp: {DEFAULT_TEMPERATURE}, K: {DEFAULT_K}, P: {DEFAULT_P})")
print("Enter your message (or type 'quit' to exit):")

while True:
    try:
        input_sentence = input("You: ")
        if input_sentence.lower().strip() == 'quit':
            break
        if not input_sentence.strip():
            continue

        start_time = time.time()

        response = generate_response_sampling(
                input_sentence, model, tokenizer, device, max_len=MAX_LEN,
                strategy="topk", k=10, temperature=0.8
            )

        end_time = time.time()
        print(f"Bot: {response}")
        print(f"(Generated in {end_time - start_time:.2f} seconds)")
        print("-" * 20)

    except KeyboardInterrupt:
        print("\nExiting...")
        break
    except Exception as e:
        print(f"\nAn error occurred during generation: {e}")
        # Decide if you want to break the loop on error or continue
        # break

print("\nInference finished.")


--- Empathetic Chatbot Ready ---
(Device: cpu, Strategy: topk, Temp: 0.75, K: 15, P: 0.92)
Enter your message (or type 'quit' to exit):
You: I feel like dying
Bot: I agree . That is a tough way to have someone .
(Generated in 0.18 seconds)
--------------------
You: quit

Inference finished.
