In [27]:
import torch
import torch.nn.functional as F
from torch.optim import Adam
from transformers import T5ForConditionalGeneration, T5Tokenizer

In [28]:
# Load the T5 tokenizer and models
tokenizer = T5Tokenizer.from_pretrained("t5-small")
english_to_lingo_model = T5ForConditionalGeneration.from_pretrained("t5-small")
lingo_to_english_model = T5ForConditionalGeneration.from_pretrained("t5-small")
embedding_model = T5ForConditionalGeneration.from_pretrained("t5-small")

In [29]:
# Define the environment that rewards Lingo translations
class LingoTranslationEnvironment:
    def __init__(self, tokenizer, english_to_lingo_model, lingo_to_english_model, embedding_model):
        self.tokenizer = tokenizer
        self.english_to_lingo_model = english_to_lingo_model
        self.lingo_to_english_model = lingo_to_english_model
        self.embedding_model = embedding_model

    def get_reward(self, english_text, lingo_text, back_to_english_text):
        # Convert texts to embeddings
        english_embedding = self._get_text_embedding(english_text)
        back_to_english_embedding = self._get_text_embedding(back_to_english_text)

        # Compute similarity between the original English text and back-to-English translation
        similarity = F.cosine_similarity(
            english_embedding.unsqueeze(0),
            back_to_english_embedding.unsqueeze(0),
        )

        # Compute the reward based on translation similarity and Lingo token length
        reward = similarity - 0.1 * len(lingo_text.split())

        return reward.item()

    def _get_text_embedding(self, text):
        inputs = self.tokenizer(text, return_tensors="pt", padding=True, truncation=True)

        # Create a tensor with the same shape as input_ids filled with the decoder_start_token_id
        decoder_input_ids = torch.full_like(inputs["input_ids"], self.embedding_model.config.decoder_start_token_id)

        outputs = self.embedding_model(**inputs, decoder_input_ids=decoder_input_ids, output_hidden_states=True)
        hidden_states = outputs.encoder_hidden_states[-1]  # Get the last hidden state
        return hidden_states.mean(dim=1).squeeze()  # Average the token embeddings and remove the batch dimension


In [30]:
# Initialize the environment
env = LingoTranslationEnvironment(tokenizer, english_to_lingo_model, lingo_to_english_model, embedding_model)

In [31]:
# Train the agent using Proximal Policy Optimization (PPO)
def train_agent_ppo(english_texts, num_epochs, num_rollouts, optimizer):
    for epoch in range(num_epochs):
        rewards = []
        for text in english_texts:
            for _ in range(num_rollouts):
                with torch.no_grad():
                    # Generate Lingo translation
                    english_input = tokenizer(text, return_tensors="pt", padding=True, truncation=True).input_ids
                    lingo_translation = english_to_lingo_model.generate(english_input)
                    lingo_text = tokenizer.decode(lingo_translation[0], skip_special_tokens=True)
                    
                    # Generate back-to-English translation
                    lingo_input = tokenizer(lingo_text, return_tensors="pt", padding=True, truncation=True).input_ids
                    back_to_english_tokens = lingo_to_english_model.generate(lingo_input)
                    back_to_english_text = tokenizer.decode(back_to_english_tokens[0], skip_special_tokens=True)

                # Calculate the reward for this rollout
                reward = env.get_reward(text, lingo_text, back_to_english_text)
                rewards.append(reward)

        # Compute the average reward
        avg_reward = sum(rewards) / len(rewards)
        print(f"Epoch {epoch + 1}/{num_epochs}: Average reward = {avg_reward}")

        # Update the models using the PPO algorithm
        optimizer.zero_grad()
        loss = -torch.tensor(avg_reward, requires_grad=True)  # Minimize the negative reward
        loss.backward()
        optimizer.step()

In [32]:
# Define the training data and optimizer
english_texts = [
    "hello",
    "world",
    "how are you",
    "goodbye",
    "please",
    "thank you",
    "yes",
    "no",
    "good morning",
    "good night",
    "happy",
    "sad",
]

optimizer = Adam(list(english_to_lingo_model.parameters()) + list(lingo_to_english_model.parameters()), lr=5e-5)

# Train the agent
num_epochs = 500
num_rollouts = 10
train_agent_ppo(english_texts, num_epochs, num_rollouts, optimizer)

# Save the models
english_to_lingo_model.save_pretrained("english_to_lingo")
lingo_to_english_model.save_pretrained("lingo_to_english")

# Test the models
test_english_texts = [
    "hello",
    "how are you",
    "goodbye",
    "good morning",
]

for text in test_english_texts:
    # Generate Lingo translation
    lingo_translation = english_to_lingo_model.generate(
        **tokenizer(text, return_tensors="pt")
    )
    lingo_text = tokenizer.decode(lingo_translation[0], skip_special_tokens=True)

    # Generate back-to-English translation
    back_to_english_translation = lingo_to_english_model.generate(
        **tokenizer(lingo_text, return_tensors="pt")
    )
    back_to_english_text = tokenizer.decode(back_to_english_translation[0], skip_special_tokens=True)

    print(f"Original English: {text}")
    print(f"Lingo: {lingo_text}")
    print(f"Back to English: {back_to_english_text}\n")



Epoch 1/500: Average reward = 0.2387471248706182
Epoch 2/500: Average reward = 0.2387471248706182
