In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import math
from nltk.translate.bleu_score import sentence_bleu
# Hyperparameters
input_size = 15 # Expanded vocabulary
output_size = 15
hidden_size = 256 # Reduced for better generalization with small dataset
embedding_dim = 128
num_layers = 2
learning_rate = 0.003 # Increased for faster learning
num_epochs = 500 # More epochs needed
batch_size = 1
dropout = 0.2
lstm_dropout = 0.2
beam_width = 3
top_k = 3
max_len = 12
teacher_forcing_ratio = 0.8 # Higher for better stability
# Corrected and expanded vocabulary
word_to_idx_en = {
'<SOS>': 0, '<EOS>': 1, '<PAD>': 2, 'hello': 3, 'world': 4, 'i': 5, 'am': 6,
'good': 7, 'how': 8, 'are': 9, 'you': 10, 'my': 11, 'name': 12, 'is': 13, 'fine': 14
}
word_to_idx_fr = {
'<SOS>': 0, '<EOS>': 1, '<PAD>': 2, 'bonjour': 3, 'monde': 4, 'je': 5, 'suis': 6,
'bien': 7, 'comment': 8, 'allez': 9, 'vous': 10, 'mon': 11, 'nom': 12, 'est': 13, 'ca': 14
}
idx_to_word_en = {v: k for k, v in word_to_idx_en.items()}
idx_to_word_fr = {v: k for k, v in word_to_idx_fr.items()}
# Corrected dataset with proper English-French translations
data = [
# English -> French
([3, 4, 1], [3, 4, 1]), # hello world -> bonjour monde
([5, 6, 7, 1], [5, 6, 7, 1]), # i am good -> je suis bien
([8, 9, 10, 1], [8, 9, 10, 1]), # how are you -> comment allez vous
([11, 12, 13, 1], [11, 12, 13, 1]), # my name is -> mon nom est
([5, 6, 14, 1], [5, 6, 7, 1]), # i am fine -> je suis bien
([3, 1], [3, 1]), # hello -> bonjour
([8, 9, 10, 14, 1], [8, 14, 9, 10, 1]), # how are you fine -> comment ca allez vous
([5, 6, 1], [5, 6, 1]), # i am -> je suis
]
# Convert to proper tensors
data = [(torch.tensor(src, dtype=torch.long), torch.tensor(tgt, dtype=torch.long)) for src, tgt in data]
# Improved Attention mechanism
class Attention(nn.Module):
	def __init__(self, hidden_size):
		super(Attention, self).__init__()
		self.attn = nn.Linear(hidden_size * 2, hidden_size)
		self.v = nn.Parameter(torch.rand(hidden_size))
		stdv = 1. / math.sqrt(self.v.size(0))
		self.v.data.uniform_(-stdv, stdv)

	def forward(self, hidden, encoder_outputs):
		seq_len = encoder_outputs.size(1)
		batch_size = encoder_outputs.size(0)
		# Use the last layer's hidden state
		hidden = hidden[-1].unsqueeze(1).repeat(1, seq_len, 1)  # [batch, seq_len, hidden]
		# Calculate energy
		energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
		attention = torch.sum(self.v * energy, dim=2)
		return F.softmax(attention, dim=1)

# Enhanced Encoder
class EncoderLSTM(nn.Module):
	def __init__(self, input_size, embedding_dim, hidden_size, num_layers, dropout, lstm_dropout):
		super(EncoderLSTM, self).__init__()
		self.embedding = nn.Embedding(input_size, embedding_dim)
		self.lstm = nn.LSTM(
			embedding_dim, hidden_size, num_layers,
			dropout=lstm_dropout if num_layers > 1 else 0,
			batch_first=True, bidirectional=False
		)
		self.dropout = nn.Dropout(dropout)

	def forward(self, src):
		embedded = self.dropout(self.embedding(src))
		outputs, (hidden, cell) = self.lstm(embedded)
		return outputs, hidden, cell

# Enhanced Decoder with Attention
class DecoderLSTM(nn.Module):
	def __init__(self, output_size, embedding_dim, hidden_size, num_layers, dropout, lstm_dropout):
		super(DecoderLSTM, self).__init__()
		self.output_size = output_size
		self.hidden_size = hidden_size
		self.num_layers = num_layers
		self.embedding = nn.Embedding(output_size, embedding_dim)
		self.attention = Attention(hidden_size)
		self.lstm = nn.LSTM(
			embedding_dim + hidden_size, hidden_size, num_layers,
			dropout=lstm_dropout if num_layers > 1 else 0,
			batch_first=True
		)
		self.fc = nn.Linear(hidden_size * 2, output_size)  # Concat context and hidden
		self.dropout = nn.Dropout(dropout)

	def forward(self, tgt, hidden, cell, encoder_outputs):
		# tgt shape: [batch_size]
		tgt = tgt.unsqueeze(1)  # [batch_size, 1]
		embedded = self.dropout(self.embedding(tgt))  # [batch_size, 1, embedding_dim]
		# Calculate attention
		attn_weights = self.attention(hidden, encoder_outputs)  # [batch_size, seq_len]
		context = torch.bmm(attn_weights.unsqueeze(1), encoder_outputs)  # [batch_size, 1, hidden_size]
		# Combine embedding and context
		lstm_input = torch.cat((embedded, context), dim=2)  # [batch_size, 1, embedding_dim + hidden_size]
		# LSTM forward pass
		output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))
		# Combine output and context for final prediction
		combined_output = torch.cat((output.squeeze(1), context.squeeze(1)), dim=1)
		prediction = self.fc(combined_output)
		return prediction, hidden, cell, attn_weights

# Seq2Seq Model
class Seq2Seq(nn.Module):
	def __init__(self, encoder, decoder):
		super(Seq2Seq, self).__init__()
		self.encoder = encoder
		self.decoder = decoder

	def forward(self, src, tgt, teacher_forcing_ratio=0.8):
		batch_size = src.size(0)
		tgt_len = tgt.size(1)
		tgt_vocab_size = self.decoder.output_size
		outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(src.device)
		encoder_outputs, hidden, cell = self.encoder(src)
		input = tgt[:, 0]  # Start token
		for t in range(1, tgt_len):
			prediction, hidden, cell, _ = self.decoder(input, hidden, cell, encoder_outputs)
			outputs[:, t] = prediction
			teacher_force = random.random() < teacher_forcing_ratio
			top1 = prediction.argmax(1)
			input = tgt[:, t] if teacher_force else top1
		return outputs

# Enhanced training function
def train(model, data, optimizer, criterion, num_epochs, device):
	model.train()
	best_loss = float('inf')
	for epoch in range(num_epochs):
		epoch_loss = 0
		random.shuffle(data)  # Shuffle data each epoch
		for src, tgt in data:
			src = src.unsqueeze(0).to(device)
			tgt = tgt.unsqueeze(0).to(device)
			optimizer.zero_grad()
			output = model(src, tgt, teacher_forcing_ratio)
			# Reshape for loss calculation
			output_dim = output.shape[-1]
			output = output[:, 1:].contiguous().view(-1, output_dim)
			tgt = tgt[:, 1:].contiguous().view(-1)
			loss = criterion(output, tgt)
			loss.backward()
			# Gradient clipping
			torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
			optimizer.step()
			epoch_loss += loss.item()
		avg_loss = epoch_loss / len(data)
		if avg_loss < best_loss:
			best_loss = avg_loss
		if (epoch + 1) % 50 == 0:
			print(f'Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}, Best Loss: {best_loss:.4f}')
# Improved decoding strategies
def greedy_decode(model, src, max_len, start_token=0, device='cpu'):
model.eval()
with torch.no_grad():
src = torch.tensor(src, dtype=torch.long).unsqueeze(0).to(device)
encoder_outputs, hidden, cell = model.encoder(src)
outputs = [start_token]
input = torch.tensor([start_token], dtype=torch.long).to(device)
for _ in range(max_len):
prediction, hidden, cell, _ = model.decoder(input, hidden, cell, encoder_outputs)
pred_token = prediction.argmax(1).item()
if pred_token == 1: # EOS token
break
outputs.append(pred_token)
input = torch.tensor([pred_token], dtype=torch.long).to(device)
return [idx_to_word_fr[idx] for idx in outputs[1:] if idx in idx_to_word_fr]
def beam_search_decode(model, src, beam_width, max_len, start_token=0, device='cpu'):
model.eval()
with torch.no_grad():
src = torch.tensor(src, dtype=torch.long).unsqueeze(0).to(device)
encoder_outputs, hidden, cell = model.encoder(src)
# Initialize beams
beams = [([start_token], 0.0, hidden, cell)]
for step in range(max_len):
new_beams = []
for seq, score, h, c in beams:
if seq[-1] == 1: # EOS token
new_beams.append((seq, score, h, c))
continue
input = torch.tensor([seq[-1]], dtype=torch.long).to(device)
prediction, new_h, new_c, _ = model.decoder(input, h, c, encoder_outputs)
log_probs = F.log_softmax(prediction, dim=1)
top_log_probs, top_idx = log_probs.topk(beam_width)
for lp, idx in zip(top_log_probs[0], top_idx[0]):
new_seq = seq + [idx.item()]
new_score = score + lp.item()
new_beams.append((new_seq, new_score, new_h, new_c))
# Keep top beams
beams = sorted(new_beams, key=lambda x: x[1] / len(x[0]), reverse=True)[:beam_width]best_seq = beams[0][0][1:] # Remove start token
return [idx_to_word_fr[idx] for idx in best_seq if idx != 1 and idx in idx_to_word_fr]
def top_k_sampling_decode(model, src, top_k, max_len, start_token=0, device='cpu', temperature=0.8):
model.eval()
with torch.no_grad():
src = torch.tensor(src, dtype=torch.long).unsqueeze(0).to(device)
encoder_outputs, hidden, cell = model.encoder(src)
outputs = [start_token]
input = torch.tensor([start_token], dtype=torch.long).to(device)
for _ in range(max_len):
prediction, hidden, cell, _ = model.decoder(input, hidden, cell, encoder_outputs)
# Apply temperature
scaled_logits = prediction / temperature
probs = F.softmax(scaled_logits, dim=1)
# Top-k sampling
top_probs, top_idx = probs.topk(min(top_k, probs.size(1)))
top_probs = top_probs / top_probs.sum(dim=1, keepdim=True)
pred_token = top_idx[0][torch.multinomial(top_probs[0], 1).item()].item()
if pred_token == 1: # EOS token
break
outputs.append(pred_token)
input = torch.tensor([pred_token], dtype=torch.long).to(device)
return [idx_to_word_fr[idx] for idx in outputs[1:] if idx in idx_to_word_fr]
# BLEU score evaluation
def evaluate_bleu(reference, candidate):
if not candidate or not reference:
return 0.0
return sentence_bleu([reference], candidate, weights=(1, 0, 0, 0)) # 1-gram BLEU for small sequences
# Main execution
def main():
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Initialize model
encoder = EncoderLSTM(input_size, embedding_dim, hidden_size, num_layers, dropout, lstm_dropout).to(device)
decoder = DecoderLSTM(output_size, embedding_dim, hidden_size, num_layers, dropout, lstm_dropout).to(device)
model = Seq2Seq(encoder, decoder).to(device)
# Initialize optimizer and criterion
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss(ignore_index=2) # Ignore PAD token
print("Starting training...")
train(model, data, optimizer, criterion, num_epochs, device)
# Test cases
test_cases = [
([3, 4], ['bonjour', 'monde']), # hello world
([5, 6, 7], ['je', 'suis', 'bien']), # i am good
([8, 9, 10], ['comment', 'allez', 'vous']), # how are you
([11, 12, 13], ['mon', 'nom', 'est']), # my name is
]
print("\n" + "="*60)
print("EVALUATION RESULTS")
print("="*60)
for src, ref in test_cases:
print(f"\nInput (English): {' '.join([idx_to_word_en[idx] for idx in src])}")
greedy_out = greedy_decode(model, src, max_len, device=device)
beam_out = beam_search_decode(model, src, beam_width, max_len, device=device)
topk_out = top_k_sampling_decode(model, src, top_k, max_len, device=device)
print(f"Reference (French): {' '.join(ref)}")
print(f"Greedy: {' '.join(greedy_out)}")
print(f"Beam Search: {' '.join(beam_out)}")
print(f"Top-K Sampling: {' '.join(topk_out)}")
# BLEU scoresgreedy_bleu = evaluate_bleu(ref, greedy_out)
beam_bleu = evaluate_bleu(ref, beam_out)
topk_bleu = evaluate_bleu(ref, topk_out)
print(f"BLEU Scores - Greedy: {greedy_bleu:.3f}, Beam: {beam_bleu:.3f}, Top-K: {topk_bleu:.3f}")
if __name__ == "__main__":
main()