## 导入包

In [1]:
import numpy as np
import torch
import torch.nn  as nn
import torch.nn.functional as F
import torch.optim as optim

## CBOW

In [3]:
class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128, k=20, vocab_weights=None):
        super(CBOW, self).__init__()
        # 当前embedding layer和全连接中使用的是同一个w
        weight = nn.Parameter(torch.randn((vocab_size, embedding_dim), dtype=torch.float32))
        self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_layer = nn.Linear(in_features=embedding_dim, out_features=vocab_size, bias=False)
        self.emb_layer.weight = weight
        self.output_layer.weight = weight

        # 为了负采样的时候，更好的提取类别对应的参数
        self.output_emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_emb_layer.weight = self.output_layer.weight  # 参数覆盖
        self.k = k  # 负采样的类别数量
        self.vocab_size = vocab_size
        self.vocab_indexes = np.asarray(list(range(vocab_size)))
        self.vocab_weights = vocab_weights  # [] 或者 None

    @torch.no_grad()
    def random_negative_indexes(self, k, pos_indexes):
        """
        随机获取k个负样本的index值
        :param k: int值
        :param pos_indexes: tensor [N,]
        :return:  tensor [k+?,]
        """
        pos_ = pos_indexes.detach().numpy()
        k = k + len(pos_)
        # replace: True允许抽样的过程中存在重复的数据； False表示不允许
        samples_ = np.random.choice(self.vocab_indexes, size=k, replace=True, p=self.vocab_weights)
        neg_indexes = []
        for _lab in samples_:
            if _lab in pos_:
                continue
            neg_indexes.append(_lab)
        if len(neg_indexes) == 0:
            return self.random_negative_indexes(k - len(pos_), pos_indexes)
        else:
            return torch.tensor(neg_indexes, dtype=torch.long)

    def forward(self, x, y):
        """
        前向过程
        :param x: [N,T] long
        :param y: [N,] long 类别单词标签
        :return:
        """
        z1 = self.emb_layer(x)  # [N,T] --> [N,T,embedding_dim]
        z2 = torch.mean(z1, dim=1)  # [N,T,embedding_dim] -+-> [N,embedding_dim]

        if self.training:
            # 希望获取每个样本属于1+k个单词类别的置信度，1表示实际类别置信度，k表示"负样本"/不是实际类别的置信度
            # 获取每个样本实际类别的置信度
            pos_weights = self.output_emb_layer(y)  # [N,] -> [N,embedding_dim]
            # [N,embedding_dim]*[N,embedding_dim] --> [N,embedding_dim] --> [N,] --> [N,1]
            pos_scores = torch.reshape(torch.sum(z2 * pos_weights, dim=1), (-1, 1))  # [N, 1]
            # 获取负样本的置信度
            k = self.k
            neg_y = self.random_negative_indexes(k=k, pos_indexes=y)
            # neg_y = torch.randint(self.vocab_size, size=(k,))  # 产生k个随机数 --> NOTE: 需要修改
            neg_weights = self.output_emb_layer(neg_y)  # [k,] -> [k,embedding_dim]
            print (f"neg_weights.shape: {neg_weights.shape}")
            # [N,embedding_dim] dot [k,embedding_dim].T --> [N,k]
            neg_scores = torch.matmul(z2, neg_weights.T)  # [N, k]
            # 合并到一起
            scores = torch.cat([pos_scores, neg_scores], dim=1)  # [N,1+k]
        else:
            scores = self.output_layer(z2)  # [N,embedding_dim] --> [N,vocab_size]  得到的是每个样本对应各个单词类别的置信度
        return scores


In [4]:
vocab_size = 50000  # 词汇表大小，也就是单词类别数目
batch_size = 16
window_size = 4

net = CBOW(vocab_size=vocab_size, embedding_dim=128)

x = torch.randint(vocab_size, size=(batch_size, window_size), dtype=torch.long)  # [N,T]
y = torch.randint(vocab_size, size=(batch_size,), dtype=torch.long)  # [N,]

net.eval()
scores = net(x, y)  # [n,vocab_size]
print(scores.shape)

# 损失：希望样本预测属于实际类别的置信度要越大越好，如果可以的话，要求预测不属于实际类别的置信度越小越好
loss_fn = nn.BCEWithLogitsLoss()
y_onehot = F.one_hot(y, vocab_size).to(torch.float32)  # [n, vocab_size]
loss = loss_fn(scores, y_onehot)
print(loss)
prob = torch.sigmoid(scores)  # [n, vocab_size]
loss2 = -torch.mean(torch.sum(y_onehot * torch.log(prob + 1e-8), dim=1))  # 只更新当前样本对应类别的参数w
print(loss2)

torch.Size([16, 50000])
tensor(2.4187, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)
tensor(2.0211, grad_fn=<NegBackward0>)


In [5]:
vocab_size = 500  # 词汇表大小，也就是单词类别数目
word_counts = np.random.randint(1, 100, size=(vocab_size,))  # 保存的是每个单词/类别出现的数量 --> 从数据集统计出来的
word_counts = np.power(word_counts, 0.75)
word_weights = word_counts / np.sum(word_counts)
batch_size = 1
window_size = 4

net = CBOW(vocab_size=vocab_size, embedding_dim=128, vocab_weights=word_weights)
opt = optim.SGD(net.parameters(), lr=0.001)

x = torch.tensor([
    [3, 5, 8, 1],
    [3, 2, 9, 1]
], dtype=torch.long)
y = torch.tensor([12, 13], dtype=torch.long)

scores = net(x, y)  # [n,vocab_size]
print(scores.shape)
print ("="*50)

y = torch.zeros_like(y)  # 当前实际类别为0
# 损失：希望样本预测属于实际类别的置信度要越大越好，如果可以的话，要求预测不属于实际类别的置信度越小越好
y_onehot = F.one_hot(y, scores.shape[1]).to(torch.float32)  # [n, vocab_size]
print (y_onehot)
loss_fn = nn.BCEWithLogitsLoss()
loss = loss_fn(scores, y_onehot)
print(loss)

prob = torch.sigmoid(scores)  # [n, 1+k]
# loss2 = -torch.mean(torch.sum(y_onehot * torch.log(prob + 1e-8), dim=1))  # 只更新当前样本对应类别的参数w
loss2 = -torch.mean(y_onehot * torch.log(prob + 1e-8) + (1 - y_onehot) * torch.log(1.0 - prob + 1e-8))
print(loss2)

opt.zero_grad()
loss2.backward()

neg_weights.shape: torch.Size([22, 128])
torch.Size([2, 23])
tensor([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0.],
        [1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0.]])
tensor(2.7845, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)
tensor(2.7841, grad_fn=<NegBackward0>)
debug查看梯度值
