In [1]:
import torch
import torch.nn as nn
import os
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import sys
import os
current_dir = os.path.dirname(os.path.abspath(__file__)) if "__file__" in locals() else os.getcwd()
root_dir = current_dir
while not os.path.isdir(os.path.join(root_dir, "nmt_cmr_parallels")):
    parent_dir = os.path.dirname(root_dir)
    if parent_dir == root_dir:
        raise FileNotFoundError("Could not find 'nmt_cmr_parallels' folder in any parent directories")
    root_dir = parent_dir

# Add project root to sys.path
if root_dir not in sys.path:
    sys.path.append(root_dir)

from nmt_cmr_parallels.data.sequence_data import load_cached_vocabulary, create_pretrained_semantic_embedding
DEFAULT_DATA_PATH = os.path.expanduser(os.path.join('~', '.seq_nlp_data'))

In [2]:
with_tokens=True

In [3]:
pretrained_embedding = create_pretrained_semantic_embedding(DEFAULT_DATA_PATH,50)
vocab = load_cached_vocabulary("peers_vocab.json")
vocab = [x.lower() for x in vocab]
if with_tokens:
    vocab = ['<null>'] + vocab + ['<SoS>','<EoS>']

In [4]:
embedding_layer = nn.Embedding(len(vocab), 50)
    
# Initialize the weights of the Embedding layer with the GloVe vectors
word_to_index = {word: idx for idx, word in enumerate(vocab)}
for word, index in word_to_index.items():
    if word in pretrained_embedding:
        embedding_layer.weight.data[index] = torch.tensor(pretrained_embedding[word], dtype=torch.float32)

# Freeze the embedding layer
for param in embedding_layer.parameters():
    param.requires_grad = False

In [5]:
def create_data(embeddings, vocab_size):
    x = embeddings.weight.data.clone()
    y = torch.eye(vocab_size)
    return x, y

vocab_size = len(vocab)
x, y = create_data(embedding_layer, vocab_size)

dataset = TensorDataset(x, y)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

class InverseEmbeddingMLP(nn.Module):
    def __init__(self, embedding_dim, vocab_size):
        super(InverseEmbeddingMLP, self).__init__()
        self.fc1 = nn.Linear(embedding_dim, 512)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(512, vocab_size)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = InverseEmbeddingMLP(embedding_layer.embedding_dim, vocab_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

def train(model, dataloader, criterion, optimizer, epochs):
    for epoch in range(epochs):
        for i, (inputs, labels) in enumerate(dataloader):
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            if i % 100 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Step {i}, Loss: {loss.item()}")

train(model, dataloader, criterion, optimizer, epochs=200)

Epoch 1/200, Step 0, Loss: 7.43808650970459
Epoch 2/200, Step 0, Loss: 7.330111503601074
Epoch 3/200, Step 0, Loss: 7.072533130645752
Epoch 4/200, Step 0, Loss: 6.789166450500488
Epoch 5/200, Step 0, Loss: 6.608606815338135
Epoch 6/200, Step 0, Loss: 6.20540714263916
Epoch 7/200, Step 0, Loss: 5.933229446411133
Epoch 8/200, Step 0, Loss: 5.491092681884766
Epoch 9/200, Step 0, Loss: 5.146666049957275
Epoch 10/200, Step 0, Loss: 4.6481428146362305
Epoch 11/200, Step 0, Loss: 4.003405570983887
Epoch 12/200, Step 0, Loss: 3.7342138290405273
Epoch 13/200, Step 0, Loss: 3.2699272632598877
Epoch 14/200, Step 0, Loss: 3.016238212585449
Epoch 15/200, Step 0, Loss: 2.2846860885620117
Epoch 16/200, Step 0, Loss: 1.8921103477478027
Epoch 17/200, Step 0, Loss: 1.6490181684494019
Epoch 18/200, Step 0, Loss: 1.6009495258331299
Epoch 19/200, Step 0, Loss: 1.089294672012329
Epoch 20/200, Step 0, Loss: 1.0453386306762695
Epoch 21/200, Step 0, Loss: 0.9235116243362427
Epoch 22/200, Step 0, Loss: 0.790096

In [None]:
#Evaluation
for word in vocab:
    if word in ['<SoS>', '<EoS>', '<null>']:
        continue
    word_index = word_to_index[word]
    embedded_word = pretrained_embedding[word]
    retrieved = model(torch.tensor(embedded_word))
    predicted_indices = torch.argmax(retrieved, dim=0)
    assert predicted_indices.item() == word_index

In [7]:
path = 'inverse_embedding_tokens.pt' if with_tokens else 'inverse_embedding.pt'
checkpoint_path = os.path.join(os.getcwd(),'inverse_embedding_tokens.pt')
torch.save(model.state_dict(), checkpoint_path)