In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as Data
import copy

from collections import Counter
import numpy as np
import random

import scipy
# from sklearn.metrics.pairwise import cosine_similarity

In [9]:
class Process_source_data():
    # 该类的作用是读取原始数据，构建词典映射关系和单词词频
    def __init__(self, data_path, vocab_size=10000):
        with open(data_path, 'r', encoding='utf8') as f:
            text = f.read()
        # 将单词切分
        self.text = text.lower().split()
        
        # 选出最多的vocab_size个词
        # 得到单词字典表，key是单词，value是次数
        vocob_dict = dict(Counter(self.text).most_common(vocab_size-1))
        # 把不常用的单词都编码为"<UNK>"
        vocob_dict['<UNK>'] = len(text) - np.sum(list(vocob_dict.values()))       
        
        #构建映射关系
        self.word2id = {word:i for i, word in enumerate(vocob_dict)}
        self.id2word = {i:word for i, word in enumerate(vocob_dict)}     
        
        # 根据3/4率调整词频
        word_counts = np.asarray(list(vocob_dict.values()))
        self.word_freq = (word_counts / np.sum(word_counts))** (3./4.)
        

In [10]:
class Skip_gram_Dataset(Data.Dataset):
    # 该类的作用是将文字转换为对应ID， 并返回给定idx时对应的训练数据
    def __init__(self, text, word2id, word_freq, C=3, K=15, batch_size=32):
        super().__init__()
        self.C = C  # 上下文窗口
        self.K = K  # 负采样比例
        
        self.text_encoded = [word2id.get(word, word2id['<UNK>']) for word in text]
        self.text_encoded = torch.tensor(self.text_encoded, dtype=torch.long)
        
        self.word_freq = torch.tensor(word_freq)
        
    def __len__(self):
        return len(self.text_encoded)
    
    def __getitem__(self, idx):
        '''
        对于给定的idx，返回对应的训练数据
        - 中心词
        - 这个单词附近的positive word
        - 随机采样的K个单词作为negative word
        '''
        center_word = self.text_encoded[idx]
        pos_idx = list(range(idx - self.C, idx)) + list(range(idx + 1, idx + self.C + 1))
        pos_idx = [i % len(self.text_encoded) for i in pos_idx]
        pos_words = self.text_encoded[pos_idx]
        
        # torch.multinomial(input, num_samples, replacement=False, *, generator=None, out=None)
        # 可以根据input的权重随机选出num_samples个input的下标
        # replacement表示是否是有放回的抽取
        select_weight = copy.deepcopy(self.word_freq)
        select_weight[pos_words] = 0     # 去除背景词
        select_weight[center_word] = 0   # 去除中心词
        # 每取一个背景词，需要取K倍的负采样
        neg_words = torch.multinomial(select_weight, self.K * pos_words.shape[0], True)

        return center_word, pos_words, neg_words  

In [11]:
data_path = 'text8/text8.train.txt'
batch_size = 32
lr = 0.2
epochs = 2
MAX_VOCAB_SIZE = 10000
EMBEDDING_SIZE = 100

In [12]:
class My_data_loader():
    def __init__(self, data_path, batch_size, shuffle=True):
        process_data = Process_source_data(data_path)
        self.dataset = Skip_gram_Dataset(process_data.text, process_data.word2id, process_data.word_freq)
        self.loader = Data.DataLoader(self.dataset, batch_size, shuffle)

In [13]:
my_loader = My_data_loader(data_path, batch_size)

In [14]:
class Embedding_Model(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super().__init__()
        self.in_embed = nn.Embedding(vocab_size, embed_size)  #中心词的词向量矩阵
        self.out_embed = nn.Embedding(vocab_size, embed_size) #背景词的词向量矩阵
        
    def forward(self, input_labels, pos_labels, neg_labels):
        input_embedding = self.in_embed(input_labels)  # [bs, embed_size]
        input_embedding = input_embedding.unsqueeze(2) # [bs, embed_size, 1]
        pos_embedding = self.out_embed(pos_labels)    # [bs, windows * 2 , embed_size]
        neg_embedding = self.out_embed(neg_labels)    # [bs, windows * 2 * K, embed_size]
        
        # 中心词与背景词应该同时出现，因此pos_dot的sigmoid结果应该趋于1
        pos_dot = torch.bmm(pos_embedding, input_embedding) # [batch_size, (window * 2), 1]
        pos_dot = pos_dot.squeeze(2) # [batch_size, (window * 2)]
        
        # 中心词与噪声词（负采样）不应该同时出现，因此pos_dot的sigmoid结果应该趋于0，
        # 但由于sigmoid函数的输出越接近1， logsigmoid的输出越接近0 
        # 因此多一个负号， 使得igmoid结果应该趋于1
        neg_dot = torch.bmm(neg_embedding, -input_embedding) # [batch_size, (window * 2 * K), 1]
        neg_dot = neg_dot.squeeze(2) # batch_size, (window * 2 * K)]
        
        # sigmoid函数的输出在为0-1之间， 则logsigmoid的输出全都小于0
        # 当sigmoid函数的输出越接近1， 则logsigmoid的输出越接近0
        # 可以理解为输出为1的损失小，为0的损失大
        log_pos = F.logsigmoid(pos_dot).sum(1) 
        log_neg = F.logsigmoid(neg_dot).sum(1)
        
        loss = log_pos + log_neg
        
        # logsigmoid的输出全都小于0， 如果要最小化loss，需要取负号
        return -loss
    
    def input_embedding(self):
        # 获取中心词的词向量矩阵
        return self.in_embed.weight.detach().numpy()

model = Embedding_Model(MAX_VOCAB_SIZE, EMBEDDING_SIZE)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
        

In [17]:
for name, param in model.named_parameters():
    print(name)
    if 'embed' in name:
        print('yes')

in_embed.weight
yes
out_embed.weight
yes


In [1]:
for e in range(1):
    for i, (input_labels, pos_labels, neg_labels) in enumerate(my_loader.loader):
        input_labels = input_labels.long()
        pos_labels = pos_labels.long()
        neg_labels = neg_labels.long()

        optimizer.zero_grad()
        loss = model(input_labels, pos_labels, neg_labels).mean()
        loss.backward()

        optimizer.step()

        if i % 100 == 0:
            print('epoch', e, 'iteration', i, loss.item())

In [5]:
def find_nearest(word):
    index = word2idx[word]
    embedding = embedding_weights[index]
    cos_dis = np.array([scipy.spatial.distance.cosine(e, embedding) for e in embedding_weights])
    return [idx2word[i] for i in cos_dis.argsort()[:10]]

In [None]:
for word in ["two", "america", "computer"]:
    print(word, find_nearest(word))