<a href="https://colab.research.google.com/github/ysj9909/NLP_practice/blob/main/Word2Vec_skipgram.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**skipgram(softmax) Model 코드 구현 연습!!**

* paper link: https://proceedings.neurips.cc/paper/2013/file/9aa42b31882ec039965f3c4923ce901b-Paper.pdf

* Dataset : text9 corpus

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np



with open("text9.txt", "r") as f:
  words = f.read().split(" ")
  f.close()

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyper-parameters
window_size = 2
embed_dim = 300
learning_rate = 9
num_epochs = 3

word2idx = {}
idx2word = {}
word2freq = {}

idx = 0
for word in words:
  if not word in word2idx:
    word2idx[word] = idx
    word2freq[word] = 1
    idx2word[idx] = word
    idx += 1
  else:
    word2freq[word] += 1

sum = 0
for word, freq in word2freq.items():
  if freq <= 1:
    word2freq[word] = 0
  else:
    word2freq[word] = (word2freq[word] / len(words)) ** (3 / 4)
    sum += word2freq[word]
vocab_size = len(word2freq)

for word, freq in word2freq.items():
  word2freq[word] /= sum

def make_one_hot(idx, vocab_size):
  one_hot = np.zeros(vocab_size)
  one_hot[idx] = 1
  return one_hot

inputs = []
targets = []
for i in range(2, 10002):
  if word2freq[words[i]] == 0:
    continue
  else:
    input = [make_one_hot(word2idx[words[i]], vocab_size)] * (2 * window_size)
    target = [word2idx[words[i + j]] for j in  range(-2, 3) if j != 0]
    targets.append(target)
    inputs.append(input)
inputs, targets = torch.Tensor(inputs), torch.LongTensor(targets)
inputs = inputs.view(-1, inputs.size(-1))
targets = targets.view(-1)

train_dataset = (inputs, targets)
print(inputs.size())
print(targets.size())


class skipgram_negative_sampling(nn.Module):
  def __init__(self, vocab_size, embed_dim):
    super(skipgram_negative_sampling, self).__init__()

    self.W = nn.Linear(vocab_size, embed_dim, bias = False)
    self.W_out = nn.Linear(embed_dim, vocab_size, bias = False)

  def train(self, train_dataset, num_epochs, k = 15, window_size = 2, lr = learning_rate):   
    inputs, targets = train_dataset
    for epoch in range(num_epochs):
      total_loss = 0
      for i in range(0, len(inputs) - 100, 100):
        W_weight = nn.Parameter(self.W.weight).to(device)
        W_out_weight = nn.Parameter(self.W_out.weight).to(device)
        input = inputs[i: i + 100].to(device)
        target = targets[i : i + 100].to(device)

        outputs  = self.W(input)
        outputs = self.W_out(outputs)       # (num_training, vocab_size)
        
        self.W.weight.retain_grad()
        self.W_out.weight.retain_grad()
        
        loss = 0
        prediction_values = outputs[:, targets]
        for t in range(100):
          negative_sampled = np.random.choice(list(word2freq.keys()),size = k,  p = list(word2freq.values()))
          negative_sampled = torch.LongTensor([word2idx[word] for word in negative_sampled])
          loss -= torch.log(F.sigmoid(prediction_values[t, t])) + torch.sum(torch.log(F.sigmoid(-outputs[t, negative_sampled])))
        loss /= 100

        loss.backward()
        W_weight = W_weight -lr * self.W.weight.grad
        W_out_weight = W_out_weight - lr * self.W_out.weight.grad
        self.W.weight, self.W_out.weight = nn.Parameter(W_weight), nn.Parameter(W_out_weight)
        # Manully zero the gradients after updating weights
        self.W.weight.grad = None
        self.W_out.weight.grad = None

        print(f"Epoch [{epoch + 1} / {num_epochs}],Step [{i} / {len(inputs)}], Loss : {loss.item()}")
      lr /= 3
  
  def compute_sim(self, word1_vec, word2_vec):
    norm_word1 = torch.sum(word1_vec ** 2) ** (1 / 2)
    norm_word2 = torch.sum(word2_vec ** 2) ** (1 / 2)
    return torch.sum(word1_vec * word2_vec) / (norm_word1 * norm_word2)
  
  def most_similar(self, word_vec):
    max_sim = 0
    most_sim_word = None
    for w, freq in word2freq.items():
      if freq > 0:
        w_vec = self.W(torch.Tensor(make_one_hot(word2idx[w], vocab_size)).to(device))
        sim = self.compute_sim(word_vec, w_vec)
        if sim > max_sim:
           most_sim_word = w
           max_sim = sim
    return most_sim_word

  def analogic_test(self, word1, word2, word3):
    word1, word2, word3 = torch.Tensor(make_one_hot(word1, vocab_size)), torch.Tensor(make_one_hot(word2, vocab_size)), torch.Tensor(make_one_hot(word3, vocab_size))
    word1_vec = self.W(word1.to(device))
    word2_vec = self.W(word2.to(device))
    word3_vec = self.W(word3.to(device))
    return self.most_similar(word1_vec - word2_vec + word3_vec)
  
  def similar_words(self, word_id, k):
    word_vec = self.W(torch.Tensor(make_one_hot(word_id, vocab_size)).to(device))
    word2sim = {}
    for w, freq in word2freq.items():
      if freq > 0:
        w_vec = self.W(torch.Tensor(make_one_hot(word2idx[w], vocab_size)).to(device))
        sim = self.compute_sim(word_vec, w_vec)
        word2sim[w] = sim
    sim_list = sorted(word2sim.items(), key = lambda x : x[1], reverse = True)
    return sim_list[1: k + 1]



model = skipgram_negative_sampling(vocab_size, embed_dim).to(device)
model.train(train_dataset, num_epochs)

torch.save(model.state_dict(), "word2vec_neg15_params.ckpt")

torch.Size([38540, 24429])
torch.Size([38540])




Epoch [1 / 3],Step [0 / 38540], Loss : 11.09066390991211
Epoch [1 / 3],Step [100 / 38540], Loss : 11.086169242858887
Epoch [1 / 3],Step [200 / 38540], Loss : 11.083160400390625
Epoch [1 / 3],Step [300 / 38540], Loss : 11.079219818115234
Epoch [1 / 3],Step [400 / 38540], Loss : 11.068778038024902
Epoch [1 / 3],Step [500 / 38540], Loss : 11.075840950012207
Epoch [1 / 3],Step [600 / 38540], Loss : 11.063456535339355
Epoch [1 / 3],Step [700 / 38540], Loss : 11.055484771728516
Epoch [1 / 3],Step [800 / 38540], Loss : 11.054841995239258
Epoch [1 / 3],Step [900 / 38540], Loss : 11.05109977722168
Epoch [1 / 3],Step [1000 / 38540], Loss : 11.026481628417969
Epoch [1 / 3],Step [1100 / 38540], Loss : 10.942771911621094
Epoch [1 / 3],Step [1200 / 38540], Loss : 10.7358980178833
Epoch [1 / 3],Step [1300 / 38540], Loss : 10.889676094055176
Epoch [1 / 3],Step [1400 / 38540], Loss : 10.761174201965332
Epoch [1 / 3],Step [1500 / 38540], Loss : 10.63432502746582
Epoch [1 / 3],Step [1600 / 38540], Loss :

In [None]:
model.analogic_test(word2idx["fast"], word2idx["faster"], word2idx["slow"])

'slow'

In [None]:
model.analogic_test(word2idx["man"], word2idx["woman"], word2idx["king"])

'king'

In [None]:
model.similar_words(word2idx["work"], 10)

[('skills', tensor(0.9882, device='cuda:0', grad_fn=<DivBackward0>)),
 ('their', tensor(0.9858, device='cuda:0', grad_fn=<DivBackward0>)),
 ('teaching', tensor(0.9857, device='cuda:0', grad_fn=<DivBackward0>)),
 ('adults', tensor(0.9854, device='cuda:0', grad_fn=<DivBackward0>)),
 ('person', tensor(0.9853, device='cuda:0', grad_fn=<DivBackward0>)),
 ('appropriate', tensor(0.9850, device='cuda:0', grad_fn=<DivBackward0>)),
 ('emotional', tensor(0.9849, device='cuda:0', grad_fn=<DivBackward0>)),
 ('subtle', tensor(0.9848, device='cuda:0', grad_fn=<DivBackward0>)),
 ('concepts', tensor(0.9848, device='cuda:0', grad_fn=<DivBackward0>)),
 ('intense', tensor(0.9846, device='cuda:0', grad_fn=<DivBackward0>))]