<a href="https://colab.research.google.com/github/zhiqiang00/model-by-pytorch/blob/main/Word2Vec.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PyTorch 实现 Word2Vec

In [2]:
!pip install sklearn



In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as tud

from collections import Counter
import numpy as np
import random
from sklearn.metrics.pairwise import cosine_similarity

random.seed(1)
np.random.seed(1)
torch.manual_seed(1)

C = 3
K = 15
epochs = 2
MAX_VOCAB_SIZE = 10000
EMBEDDING_SIZE = 100
batch_size = 32
lr = 0.2


In [4]:
with open('/content/drive/MyDrive/MLLearning/data/text8/text8.train.txt') as f:
  text = f.read()

text = text.lower().split()
vocab_dict = dict(Counter(text).most_common(MAX_VOCAB_SIZE - 1))
vocab_dict['<UNK>'] = len(text) - np.sum(list(vocab_dict.values()))

word2idx = {word:i for i, word in enumerate(vocab_dict.keys())}
idx2word = {i:word for i, word in enumerate(vocab_dict.keys())}
word_counts = np.array([count for count in vocab_dict.values()], dtype=np.float32)
word_freqs = word_counts / np.sum(word_counts)
word_freqs = word_freqs ** (3. / 4.)

# 实现 DataLoader

In [5]:
class WordEmbeddingDataset(tud.Dataset):
  def __init__(self, text, word2idx, word_freqs):
    '''text: a list of words, all text from the training dataset
      word2idx: the dictionary from word to index
      word_freqs: the frequency of each word
    '''
    super(WordEmbeddingDataset, self).__init__() # #通过父类初始化模型，然后重写两个方法
    self.text_encoded = [word2idx.get(word, word2idx['<UNK>']) for word in text]
    self.text_encoded = torch.LongTensor(self.text_encoded)
    self.word2idx = word2idx
    self.word_freqs = torch.Tensor(word_freqs)

  def __len__(self):
    return len(self.text_encoded) #返回所有单词的总数 即item的总数

  def __getitem__(self, idx):
    '''这个function返回以下数据用于训练
      - 中心词
      - 这个单词附近的positive word
      - 随机采样的K个单词作为negative word
    '''
    center_words = self.text_encoded[idx] #取中心词
    pos_indices = list(range(idx - C, idx)) + list(range(idx + 1, idx + C + 1))
    pos_indices = [i % len(self.text_encoded) for i in pos_indices] # 取余数 避免越界
    pos_words = self.text_encoded[pos_indices]
    neg_words = torch.multinomial(self.word_freqs, K*pos_words.shape[0], True)

      # while循环证不包含背景词,如果取交集len大于零重新采样
    while len(set(pos_words.numpy().tolist()) & set(neg_words.numpy().tolist())) > 0:
      neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)

    return center_words, pos_words, neg_words



In [6]:
dataset = WordEmbeddingDataset(text, word2idx, word_freqs)
dataloader = tud.DataLoader(dataset, batch_size, shuffle=True)

# 定义Pytorch模型

In [7]:
class EmbeddingModel(nn.Module):
  def __init__(self, vocab_size, embed_size):
    super(EmbeddingModel, self).__init__()

    self.vocab_size = vocab_size
    self.embed_size = embed_size

    self.in_embed = nn.Embedding(self.vocab_size, self.embed_size)
    self.out_embed = nn.Embedding(self.vocab_size, self.embed_size) 
  
  def forward(self, input_labels, pos_labels, neg_labels):
    '''input_labels: center words, [batch_size]
      pos_labels: positive words, [batch_size, (window_size * 2)]
      neg_labels：negative words, [batch_size, (window_size * 2 * K)]
            
      return: loss, [batch_size]
    '''
    input_embedding = self.in_embed(input_labels)
    pos_embedding = self.out_embed(pos_labels)
    neg_embedding = self.out_embed(neg_labels)

    input_embedding = input_embedding.unsqueeze(2)
    pos_dot = torch.bmm(pos_embedding, input_embedding) #32*6*1 = 32*6*100 X 32*100*1
    pos_dot = pos_dot.squeeze(2)

    neg_dot = torch.bmm(neg_embedding, -input_embedding) # [batch_size, (window * 2 * K), 1]
    neg_dot = neg_dot.squeeze(2) # batch_size, (window * 2 * K)]

    log_pos = F.logsigmoid(pos_dot).sum(1)

    log_neg = F.logsigmoid(neg_dot).sum(1) 

    loss = log_pos + log_neg #logsigmoid的结果全部为负数，所以loss取负数

    return -loss
  def input_embedding(self):
    return self.in_embed.weight.detach.numpy()

model = EmbeddingModel(MAX_VOCAB_SIZE, EMBEDDING_SIZE)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# 模型训练

In [None]:
for e in range(1):
  for i, (input_labels, pos_labels, neg_labels) in enumerate(dataloader):
    input_labels = input_labels.long()
    pos_labels = pos_labels.long()
    neg_labels = neg_labels.long()

    optimizer.zero_grad()
    loss = model(input_labels, pos_labels, neg_labels).mean()
    loss.backward()

    optimizer.step()

    if i%100 == 0:
      print('epoch', e, 'iteration', i, loss.item())

embedding_weights = model.input_embedding()
torch.save(model.load_state_dict(), "embedding-{}.th".format(EMBEDDING_SIZE))

# 词向量应用

In [None]:
def find_nearest(word):
  index = word2idx[word]
  embedding = embedding_weights[index]
  cos_dis = np.array([scipy.spatial.distance.cosine(e, embedding) for e in embedding_weights])
  return [idx2word[i] for i in cos_dis.argsort()[:10]]

for word in ["two", "america", "computer"]:
  print(word, find_nearest(word))