In [1]:
!pip install torch numpy matplotlib nltk  tqdm



In [2]:
!pip install ipdb



In [3]:
import nltk
from nltk.corpus import wordnet
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

nltk.download('punkt_tab')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/bogdan/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package wordnet to /Users/bogdan/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /Users/bogdan/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

In [4]:
lema = WordNetLemmatizer()


def process_text(text):
    tokens = word_tokenize(text)
    tokens = [lema.lemmatize(token) for token in tokens if token.isalpha()]
    return tokens

In [5]:
from nltk.corpus import wordnet as wn
import numpy as np

# Tokenize definitions and build vocabulary
def build_wordnet_vocab():
    vocab = set()
    definitions = {}

    poses = ['a', 'r', 's', 'v', 'n']

    for pos in poses:
        for i, synset in enumerate(wn.all_synsets(pos)):
            if i >= 500:
                break

            word = synset.name().split('.')[0]
            gloss = synset.definition()
            tokens = process_text(gloss)
            definitions[word] = tokens
            vocab.update(tokens + [word])

    word_to_id = {word: idx for idx, word in enumerate(vocab)}
    id_to_word = {idx: word for word, idx in word_to_id.items()}
    return definitions, word_to_id, id_to_word

definitions, word_to_id, id_to_word = build_wordnet_vocab()

In [6]:
pos_idx = {
    'padding': 0,
    'a': 1,
    's': 2,
    'r': 3,
    'v': 4,
    'n': 5,
    'default': 6
}



In [7]:
def get_pos_weight(word):
    synsets = wn.synsets(word)
    if not synsets:
        return pos_idx['default']
    # Use the first synset (most common sense)
    pos = synsets[0].pos()
    return pos_idx.get(pos, pos_idx['default'])

In [8]:
import tqdm
import torch

In [9]:
padding_id = len(word_to_id) 

def generate_data(word_to_id, definitions, max_gloss_length=15):
    """
    Generate word IDs, gloss IDs, gloss POS IDs, and a mask for padded glosses.

    Args:
        word_to_id (dict): Mapping from words to unique IDs.
        definitions (dict): Dictionary mapping words to their gloss tokens.
        max_gloss_length (int): Maximum length of gloss sequences.

    Returns:
        word_ids (torch.Tensor): Tensor of word IDs (shape: [num_words]).
        gloss_ids (torch.Tensor): Padded tensor of gloss IDs (shape: [num_words, max_gloss_length]).
        gloss_pos_ids (torch.Tensor): Padded tensor of gloss POS weights (shape: [num_words, max_gloss_length]).
        gloss_mask (torch.Tensor): Boolean mask for non-padding tokens (shape: [num_words, max_gloss_length]).
    """
    word_ids_ = []  # List to store word IDs
    gloss_ids = []  # List to store gloss token IDs
    gloss_pos_ids = []  # List to store gloss POS weights
    gloss_masks = []  # List to store masks for padding tokens

    for word, id in word_to_id.items():
        gloss_tokens = definitions.get(word, [])  # Retrieve gloss tokens for the word
        
        # Convert gloss tokens to IDs and POS weights, truncated to max_gloss_length
        gloss_idx = [word_to_id[token] for token in gloss_tokens][:max_gloss_length]
        gloss_pos_idx = [get_pos_weight(token) for token in gloss_tokens][:max_gloss_length]
        
        # Create a mask for non-padding tokens
        mask = [1] * len(gloss_idx)
        
        # Add padding to gloss_idx, gloss_pos_idx, and mask until max_gloss_length is reached
        while len(gloss_idx) < max_gloss_length:
            gloss_idx.append(padding_id)
            gloss_pos_idx.append(pos_idx['padding'])  # Assuming `pos_idx['padding']` is predefined
            mask.append(0)  # Padding positions are 0 in the mask
        
        word_ids_.append(id)
        gloss_ids.append(gloss_idx)
        gloss_pos_ids.append(gloss_pos_idx)
        gloss_masks.append(mask)

    # Convert to tensors
    word_ids_ = torch.tensor(word_ids_, dtype=torch.long)  # Shape: [num_words]
    gloss_ids = torch.tensor(gloss_ids, dtype=torch.long)  # Shape: [num_words, max_gloss_length]
    gloss_pos_ids = torch.tensor(gloss_pos_ids, dtype=torch.long)  # Shape: [num_words, max_gloss_length]
    gloss_masks = torch.tensor(gloss_masks, dtype=torch.bool)  # Shape: [num_words, max_gloss_length]

    return word_ids_, gloss_ids, gloss_pos_ids, gloss_masks

In [10]:
word_ids, gloss_ids, gloss_pos_ids, gloss_masks = generate_data(word_to_id, definitions)

In [11]:
word_ids.shape, gloss_ids.shape, gloss_pos_ids.shape, gloss_masks.shape

(torch.Size([4868]),
 torch.Size([4868, 15]),
 torch.Size([4868, 15]),
 torch.Size([4868, 15]))

In [12]:
from torch.utils.data import DataLoader, TensorDataset

batch_size = 16
dataset = TensorDataset(word_ids, gloss_ids, gloss_pos_ids, gloss_masks)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [13]:
import torch.nn as nn

class EmbeddingLayer(nn.Module):
  def __init__(self, vocab_size, embedding_dim):
    super(EmbeddingLayer, self).__init__()
    self.embedding = nn.Embedding(vocab_size, embedding_dim)

  def forward(self, x):
    return self.embedding(x)


In [14]:
class POSWeighting(nn.Module):
  def __init__(self, num_pos):
    super(POSWeighting, self).__init__()
    self.weights = nn.Embedding(num_pos, 1)

  def forward(self, pos_ids, gloss_embeddings):
    pos_weights = self.weights(pos_ids)
    return pos_weights * gloss_embeddings


In [15]:
class GlossAttention(nn.Module):
    def __init__(self, embedding_dim):
        super(GlossAttention, self).__init__()
        self.query = nn.Linear(embedding_dim, embedding_dim)
        self.key = nn.Linear(embedding_dim, embedding_dim)
        self.value = nn.Linear(embedding_dim, embedding_dim)
        self.softmax = nn.Softmax(dim=-1)
        
        # Initialize weights with smaller values
        self._init_weights()
        
    def _init_weights(self):
        # Initialize with smaller weights to prevent explosion
        for module in [self.query, self.key, self.value]:
            nn.init.xavier_uniform_(module.weight, gain=0.1)
            nn.init.zeros_(module.bias)
            
    def forward(self, word_embedding, gloss_embeddings, gloss_mask):
        if gloss_embeddings.dim() == 0 or gloss_embeddings.shape[1] == 0:
            return word_embedding

        # Scale factor for attention scores
        scaling_factor = float(self.query.out_features) ** -0.5
        
        # Compute Q, K, V with gradient clipping
        query = torch.clamp(self.query(word_embedding), -100, 100)
        key = torch.clamp(self.key(gloss_embeddings), -100, 100)
        value = torch.clamp(self.value(gloss_embeddings), -100, 100)

        # Calculate attention scores with scaling
        attention_scores = torch.bmm(query.unsqueeze(1), key.transpose(1, 2)) * scaling_factor
        
        # Apply mask if provided
        if gloss_mask is not None:
            attention_scores = attention_scores.masked_fill(~gloss_mask.unsqueeze(1), -1e9)
        
        # Apply softmax with numerical stability
        attention_weights = self.softmax(attention_scores)
        
        # Add small epsilon to prevent division by zero
        attention_weights = attention_weights + 1e-8
        
        # Check for NaN values
        if torch.isnan(attention_weights).any():
            print("NaN detected in attention weights")
            attention_weights = torch.where(
                torch.isnan(attention_weights),
                torch.full_like(attention_weights, 1.0 / attention_weights.size(-1)),
                attention_weights
            )

        # Compute final attention
        attended_embedding = torch.bmm(attention_weights, value)
        
        return attended_embedding.squeeze(1)

In [16]:
class WordEmbeddingPipeline(nn.Module):
  def __init__(self, vocab_size, embedding_dim, num_pos):
    super(WordEmbeddingPipeline, self).__init__()
    self.embedding_layer = EmbeddingLayer(vocab_size, embedding_dim)
    self.pos_weighting = POSWeighting(num_pos)
    self.gloss_attention = GlossAttention(embedding_dim)

  def forward(self, word_ids_, gloss_ids_, gloss_pos_ids_, gloss_masks_):
    word_embeddings_ = self.embedding_layer(word_ids_)
    gloss_embeddings = self.embedding_layer(gloss_ids_)

    weighted_gloss_embeddings = self.pos_weighting(gloss_pos_ids_, gloss_embeddings)
    attended_embeddings_ = self.gloss_attention(word_embeddings_, weighted_gloss_embeddings, gloss_masks_)

    return word_embeddings_, attended_embeddings_

In [32]:
num_pos = len(pos_idx)
vocab_size = len(word_to_id)
embedding_dim = 100


model = WordEmbeddingPipeline(vocab_size + 1, embedding_dim, num_pos)

In [33]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print("devve: ", device)

devve:  cpu


In [19]:
model.eval

<bound method Module.eval of WordEmbeddingPipeline(
  (embedding_layer): EmbeddingLayer(
    (embedding): Embedding(4869, 100)
  )
  (pos_weighting): POSWeighting(
    (weights): Embedding(7, 1)
  )
  (gloss_attention): GlossAttention(
    (query): Linear(in_features=100, out_features=100, bias=True)
    (key): Linear(in_features=100, out_features=100, bias=True)
    (value): Linear(in_features=100, out_features=100, bias=True)
    (softmax): Softmax(dim=-1)
  )
)>

In [20]:
import numpy as np
import random

In [34]:
def generate_negative_samples(word_embeddings, gloss_embeddings, num_rand_sample=1, num_hard_samples=1):
    batch_size, embedding_dim = gloss_embeddings.size()

    # Random negatives
    random_indices = [random.choice(range(batch_size)) for _ in range(batch_size * num_rand_sample)]
    random_negatives = gloss_embeddings[random_indices]

    # Hard negatives
    similarity_scores = torch.matmul(word_embeddings, gloss_embeddings.T)
    mask = torch.eye(batch_size, device=similarity_scores.device).bool()  # Exclude positive samples
    similarity_scores.masked_fill_(mask, float('-inf'))  # Mask out positives
    _, hard_negative_indices = torch.topk(similarity_scores, k=num_hard_samples, dim=-1)
    hard_negatives = gloss_embeddings[hard_negative_indices.view(-1)]  # Flatten

    # Combine random and hard negatives
    mixed_negatives = torch.cat([random_negatives, hard_negatives], dim=0)

    return mixed_negatives

In [35]:

criterion = nn.CosineEmbeddingLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=1e-6)

patience = 3  # Number of epochs to wait for improvement
min_delta = 0.00001 # Minimum change in los to be considered an improvement
best_loss = float('inf')  # Initialize best loss to infinity
epochs_without_improvement = 0

epochs = 250

for epoch in range(epochs):
  total_loss = 0.0

  with tqdm.tqdm(total=vocab_size, desc=f"Epoch {epoch + 1}/{epochs}") as pbar:
   for words_id, gloss_id, gloss_pos, gloss_mask in dataloader:
    words_id = words_id.to(device)
    gloss_id = gloss_id.to(device)
    gloss_pos = gloss_pos.to(device)
    gloss_mask = gloss_mask.to(device)
    optimizer.zero_grad()

    word_embeddings, attended_embeddings = model(words_id, gloss_id, gloss_pos, gloss_mask)
    
    positive_target = torch.ones(word_embeddings.size(0)).to(device)
    # 
    # # Generate mixed negatives
    mixed_negatives = generate_negative_samples(word_embeddings, attended_embeddings)
    # 
    # # Compute positive loss
    positive_loss = criterion(word_embeddings, attended_embeddings, positive_target)

    # Compute negative loss
    negative_target = -torch.ones(mixed_negatives.size(0)).to(device)
    negative_loss = criterion(word_embeddings.repeat(mixed_negatives.size(0) // word_embeddings.size(0), 1),
                                mixed_negatives, negative_target)
    # 
    # loss = criterion(word_embeddings, attended_embeddings)
    loss = positive_loss + negative_loss
    loss.backward()
    optimizer.step()

    total_loss += loss.item()
    pbar.update(batch_size)
    pbar.set_postfix({'Loss': total_loss / (pbar.n + 1)})

  avg_loss = total_loss / len(dataloader)
  print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss}")
  
  scheduler.step()
  
  if epoch % 10 == 0:
      model.eval()
      test()

  if avg_loss < best_loss - min_delta:
      best_loss = avg_loss
      epochs_without_improvement = 0
  else:
      epochs_without_improvement += 1
      if epochs_without_improvement >= patience:
          print(f"Early stopping triggered after {epoch + 1} epochs.")
          break  # Exit the training loop

Epoch 1/250: 4880it [00:04, 1050.36it/s, Loss=0.0667]                          


Epoch 1/250, Loss: 1.0671117520723186
Best similarity: 0.6911389231681824
Worst similarity: -0.4140896201133728
Average similarity: 0.003537511718663185


Epoch 2/250: 4880it [00:04, 1091.59it/s, Loss=0.0638]                          


Epoch 2/250, Loss: 1.0205749261574668


Epoch 3/250: 4880it [00:04, 1071.49it/s, Loss=0.0624]                          


Epoch 3/250, Loss: 0.9980231758023872


Epoch 4/250: 4880it [00:04, 1113.92it/s, Loss=0.0612]                          


Epoch 4/250, Loss: 0.9801908475453737


Epoch 5/250: 4880it [00:04, 1058.90it/s, Loss=0.0603]                          


Epoch 5/250, Loss: 0.9643614315595783


Epoch 6/250: 4880it [00:04, 1063.00it/s, Loss=0.0594]                          


Epoch 6/250, Loss: 0.951272466925324


Epoch 7/250: 4880it [00:04, 1042.85it/s, Loss=0.0586]                          


Epoch 7/250, Loss: 0.9385486264697841


Epoch 8/250: 4880it [00:05, 945.37it/s, Loss=0.058]                           


Epoch 8/250, Loss: 0.9280930943176394


Epoch 9/250: 4880it [00:04, 1052.43it/s, Loss=0.0574]                          


Epoch 9/250, Loss: 0.917869417980069


Epoch 10/250: 4880it [00:04, 1132.49it/s, Loss=0.0567]                          


Epoch 10/250, Loss: 0.907073543501682


Epoch 11/250: 4880it [00:04, 1121.07it/s, Loss=0.0558]                          


Epoch 11/250, Loss: 0.8927434053577361
Best similarity: 0.6879138946533203
Worst similarity: -0.5489588975906372
Average similarity: 0.009452993540603194


Epoch 12/250: 4880it [00:04, 1046.61it/s, Loss=0.0557]                          


Epoch 12/250, Loss: 0.8910251294980284


Epoch 13/250: 4880it [00:05, 964.59it/s, Loss=0.0557]                           


Epoch 13/250, Loss: 0.8906578771403578


Epoch 14/250: 4880it [00:04, 1066.17it/s, Loss=0.0558]                          


Epoch 14/250, Loss: 0.8927439976911076


Epoch 15/250: 4880it [00:04, 997.73it/s, Loss=0.0557]                           


Epoch 15/250, Loss: 0.8913757062349164


Epoch 16/250: 4880it [00:04, 1140.85it/s, Loss=0.0557]                          

Epoch 16/250, Loss: 0.8920072162737612
Early stopping triggered after 16 epochs.





In [None]:
for param in model.parameters():
  print(param)

In [None]:
model.eval()

In [None]:
torch.save(model.state_dict(), 'wordnet_based_embeddings.pth')

In [23]:

from sklearn.metrics.pairwise import cosine_similarity

In [24]:
def get_word_embeddings(words, word_to_id, model: WordEmbeddingPipeline):
    word_ids = [word_to_id[word] for word in words if word in word_to_id]
    word_ids = torch.tensor(word_ids)
    embeddings = model.embedding_layer.embedding(word_ids).detach().numpy()  # Get embeddings from the model
    return embeddings, word_ids

In [25]:
def calculate_similarity(word1, word2, word_to_id, model):
    embeddings, _ = get_word_embeddings([word1, word2], word_to_id, model)
    similarity = cosine_similarity(embeddings[0].reshape(1, -1), embeddings[1].reshape(1, -1))[0][0]
    return similarity


In [26]:
word1 = "sun"
word2 = "hot"
similarity = calculate_similarity(word1, word2, word_to_id, model)
print(f"Similarity between '{word1}' and '{word2}': {similarity}")

Similarity between 'sun' and 'hot': -0.001956711523234844


In [27]:
def predict_word(word, top_k=5):
    """Predicts the top k most similar words to the given word."""

    # Get the embedding of the input word
    word_embedding = model.embedding_layer.embedding(torch.tensor(word_to_id[word], device=device))

    # Calculate cosine similarity with all other words in the vocabulary
    similarities = cosine_similarity(word_embedding.cpu().detach().numpy().reshape(1, -1),
                                     model.embedding_layer.embedding.weight.cpu().detach().numpy())

    # Get the indices of the top k most similar words
    top_k_indices = np.argsort(similarities[0])[::-1][1:top_k + 1]  # Exclude the input word itself

    # Get the predicted words
    predicted_words = [id_to_word[idx] for idx in top_k_indices]

    return predicted_words

# Example usage:
target_word = "sun"
predicted_words = predict_word(target_word, top_k=10)
print(f"Target word: {target_word}")
print(f"Predicted words: {predicted_words}")

Target word: sun
Predicted words: ['participation', 'plant', 'onshore', 'reciprocal', 'assumed', 'satisfactorily', 'grant', 'drug', 'medicinal', 'effectively']


In [28]:
def check_embedding_quality(word):
    # Get the embedding of the input word
    word_embedding = model.embedding_layer.embedding(torch.tensor(word_to_id[word], device=device))

    # get gloss
    gloss = definitions[word]

    #calculate the weighted average of the the embeddings of the gloss
    gloss_embedding = np.zeros_like(word_embedding.detach().numpy())
    total_weight = 0
    for token in gloss:
        if token in word_to_id:
            pos = get_pos_weight(token)
            token_embedding = model.embedding_layer.embedding(torch.tensor(word_to_id[token], device=device))
            weight = model.pos_weighting.weights(torch.tensor(pos, device=device)).detach().numpy()
            gloss_embedding += token_embedding.detach().numpy() * weight
            total_weight += weight


    gloss_embedding /= total_weight
    # print(f'gloss_embedding: {gloss_embedding}')

    # print(f'nacho: {word_embedding.cpu().detach().numpy().reshape(1, -1)}')


    #check similarity of the word and its gloss embedding
    similarity = cosine_similarity(word_embedding.cpu().detach().numpy().reshape(1, -1),
                                   gloss_embedding.reshape(1, -1))[0][0]

    # print(f'word: {word}')
    # print(f'gloss: {gloss}')
    # print(f'embedding: {word_embedding}')
    # print(word_embedding.shape)
    # print(f'gloss_embedding: {gloss_embedding}')
    # print(gloss_embedding.shape)
    # print(f'similarity: {similarity}')

    return similarity


words_to_test = ['valediction', 'retreat', 'breaking_away', 'forced_landing', 'penetration',  'underachievement']

for word in words_to_test:
    similarity = check_embedding_quality(word)
    print(f"Similarity between '{word}' and its gloss: {similarity}")

Similarity between 'valediction' and its gloss: -0.06518992781639099
Similarity between 'retreat' and its gloss: 0.044464316219091415
Similarity between 'breaking_away' and its gloss: 0.0010470787528902292
Similarity between 'forced_landing' and its gloss: -0.11445791274309158
Similarity between 'penetration' and its gloss: -0.02045590989291668
Similarity between 'underachievement' and its gloss: -0.11939556151628494


In [29]:
def test():
    best_sim = float('-inf')
    worst_sim = float('inf')
    avg_sim = 0
    for word in definitions.keys():
        similarity = check_embedding_quality(word)
        avg_sim += similarity
        best_sim = max(best_sim, similarity)
        worst_sim = min(worst_sim, similarity)
    
    avg_sim /= len(definitions)
    print(f"Best similarity: {best_sim}")
    print(f"Worst similarity: {worst_sim}")
    print(f"Average similarity: {avg_sim}")