# 导入包

In [2]:
!pip install torch

Collecting torch
  Downloading torch-2.5.1-cp312-cp312-win_amd64.whl.metadata (28 kB)
Collecting filelock (from torch)
  Downloading filelock-3.16.1-py3-none-any.whl.metadata (2.9 kB)
Collecting typing-extensions>=4.8.0 (from torch)
  Using cached typing_extensions-4.12.2-py3-none-any.whl.metadata (3.0 kB)
Collecting networkx (from torch)
  Downloading networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting fsspec (from torch)
  Downloading fsspec-2024.10.0-py3-none-any.whl.metadata (11 kB)
Collecting sympy==1.13.1 (from torch)
  Using cached sympy-1.13.1-py3-none-any.whl.metadata (12 kB)
Collecting mpmath<1.4,>=1.1.0 (from sympy==1.13.1->torch)
  Using cached mpmath-1.3.0-py3-none-any.whl.metadata (8.6 kB)
Downloading torch-2.5.1-cp312-cp312-win_amd64.whl (203.0 MB)
   ---------------------------------------- 0.0/203.0 MB ? eta -:--:--
   ---------------------------------------- 0.0/203.0 MB ? eta -:--:--
   ---------------------------------------- 0.3/203.0 MB ? eta -:--:--
   -

In [3]:
import time
import numpy as np
import torch
import torch.nn  as nn
import torch.nn.functional as F


# CBOW

In [2]:
class CBOW(nn.Module): # CBOW 使用上下文的平均嵌入向量预测目标单词
    def __init__(self, vocab_size, embedding_dim=128):
        super(CBOW, self).__init__()
        self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_layer = nn.Linear(in_features=embedding_dim, out_features=vocab_size)  # 参数量：embedding_dim*vocab_size

    def forward(self, x):
        """
        前向过程
        :param x: [N,T] long
        :return:
        """
        z1 = self.emb_layer(x)  # [N,T] --> [N,T,embedding_dim]
        z2 = torch.mean(z1, dim=1)  # [N,T,embedding_dim] --> [N,embedding_dim]
        scores = self.output_layer(z2)  # [N,embedding_dim] --> [N,vocab_size]  得到的是每个样本对应各个单词类别的置信度
        return scores

## scores

In [3]:
vocab_size = 50000  # 词汇表大小，也就是单词类别数目
batch_size=16
window_size = 4     # 多个老师教一个学生

net = CBOW(vocab_size=vocab_size, embedding_dim=128) # 创建CBOW实例
x = torch.randint(vocab_size, size=(batch_size, window_size), dtype=torch.long)  # [N,T]
y = torch.randint(vocab_size, size=(batch_size,), dtype=torch.long)  # [N,]

scores = net(x)  # [N,vocab_size]
print(scores.shape)

torch.Size([16, 50000])


In [4]:
scores[0]

tensor([-0.0617, -0.4571,  0.2640,  ..., -0.0232, -0.5308,  0.0879],
       grad_fn=<SelectBackward0>)

## sigmoid & BCEWithLogitsLoss

In [5]:
# 损失：希望样本预测属于实际类别的置信度/概率要越大越好，如果可以的话，要求预测不属于实际类别的置信度越小越好
# 二分类
loss_fn = nn.BCEWithLogitsLoss() # 二元交叉熵损失函数，衡量真实标签概率分布和预测的概率分布之间的差异
y_onehot = F.one_hot(y, vocab_size).to(torch.float32)  # [N, vocab_size]
loss = loss_fn(scores, y_onehot)
print(loss)

tensor(0.7040, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)


In [6]:
# 手动计算
prob = torch.sigmoid(scores)  # [N, vocab_size]
loss2 = -torch.mean(y_onehot * torch.log(prob + 1e-8) + (1 - y_onehot) * torch.log(1.0 - prob + 1e-8))
print(loss2)

tensor(0.7040, grad_fn=<NegBackward0>)


In [7]:
res = y_onehot * torch.log(prob + 1e-8) + (1 - y_onehot) * torch.log(1.0 - prob + 1e-8)
res.shape

torch.Size([16, 50000])

## softmax & CrossEntropyLoss

In [8]:
# 多分类
loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(scores, y)
print(loss)

tensor(10.7567, grad_fn=<NllLossBackward0>)


In [9]:
prob = torch.softmax(scores, dim=1)  # [N, vocab_size]
y_onehot = F.one_hot(y, vocab_size)  # [N, vocab_size]
loss2 = -torch.mean(torch.sum(y_onehot * torch.log(prob), dim=1))
print(loss2)

tensor(10.7567, grad_fn=<NegBackward0>)


# CBOW backward

In [10]:
import time

import torch
import torch.nn  as nn
import torch.nn.functional as F
import torch.optim as optim

In [11]:
class CBOW(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128):
        super(CBOW, self).__init__()
        # 当前embedding layer和全连接中使用的是同一个w
        weight = nn.Parameter(torch.randn((vocab_size, embedding_dim), dtype=torch.float32))
        self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_layer = nn.Linear(in_features=embedding_dim, out_features=vocab_size, bias=False)
        self.emb_layer.weight = weight
        self.output_layer.weight = weight

    def forward(self, x):
        """
        前向过程
        :param x: [batch_size,window_size] long
        :return:
        """
        z1 = self.emb_layer(x)  # [batch_size,window_size] --> [batch_size,window_size,embedding_dim]
        z2 = torch.mean(z1, dim=1)  # [batch_size,window_size,embedding_dim] --> [batch_size,embedding_dim]
        scores = self.output_layer(z2)  # [batch_size,embedding_dim] --> [batch_size,vocab_size]  得到的是每个样本对应各个单词类别的置信度
        return scores

In [12]:
vocab_size = 500  # 词汇表大小，也就是单词类别数目
batch_size = 1
window_size = 4

net = CBOW(vocab_size=vocab_size, embedding_dim=128)
opt = optim.SGD(net.parameters(), lr=0.1)
for para in net.parameters():
    print(para.shape)
    print (para)

torch.Size([500, 128])
Parameter containing:
tensor([[-0.0259,  0.8893, -1.3472,  ..., -1.6672, -0.4592,  1.2228],
        [ 0.4772,  0.5928,  0.7466,  ...,  1.5107, -0.0737,  0.5404],
        [ 1.9349, -0.1386,  0.7627,  ...,  0.3867,  1.0012, -0.0150],
        ...,
        [ 0.7504,  1.1152,  0.7625,  ...,  0.9711, -1.1843, -0.7822],
        [-0.0624, -0.1084,  0.0724,  ...,  1.5384,  0.2212, -0.8455],
        [-1.1093, -0.4221, -0.3100,  ..., -0.0775,  0.6637, -0.6289]],
       requires_grad=True)


In [13]:
for para in net.parameters():
    print(para.shape)
    print (para)

torch.Size([500, 128])
Parameter containing:
tensor([[-0.0259,  0.8893, -1.3472,  ..., -1.6672, -0.4592,  1.2228],
        [ 0.4772,  0.5928,  0.7466,  ...,  1.5107, -0.0737,  0.5404],
        [ 1.9349, -0.1386,  0.7627,  ...,  0.3867,  1.0012, -0.0150],
        ...,
        [ 0.7504,  1.1152,  0.7625,  ...,  0.9711, -1.1843, -0.7822],
        [-0.0624, -0.1084,  0.0724,  ...,  1.5384,  0.2212, -0.8455],
        [-1.1093, -0.4221, -0.3100,  ..., -0.0775,  0.6637, -0.6289]],
       requires_grad=True)


In [14]:
para[0][0]

tensor(-0.0259, grad_fn=<SelectBackward0>)

In [15]:
x = torch.tensor([[3, 5, 8, 1]], dtype=torch.long)
y = torch.tensor([3], dtype=torch.long)

scores = net(x)  # [n,vocab_size] 调用CBOW模型
print(scores.shape)

torch.Size([1, 500])


In [16]:
# 损失：希望样本预测属于实际类别的置信度要越大越好，如果可以的话，要求预测不属于实际类别的置信度越小越好
y_onehot = F.one_hot(y, vocab_size).to(torch.float32)  # [n, vocab_size]
loss_fn = nn.BCEWithLogitsLoss()
loss = loss_fn(scores, y_onehot)
print(loss)

tensor(2.5456, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)


In [17]:
prob = torch.sigmoid(scores)  # [n, vocab_size]
# loss2 = -torch.mean(torch.sum(y_onehot * torch.log(prob + 1e-8), dim=1))  # 只更新当前样本对应类别的参数w
loss2 = -torch.mean(y_onehot * torch.log(prob + 1e-8) + (1 - y_onehot) * torch.log(1.0 - prob + 1e-8))
print(loss2)

tensor(2.4594, grad_fn=<NegBackward0>)


In [18]:
opt.zero_grad() # 清零梯度
loss2.backward() # 反向传播计算梯度
opt.step() # 根据计算出的梯度进行更新

In [19]:
print("debug查看梯度值")
for name, param in net.named_parameters():
    if param.requires_grad:
        print(name, param.grad)

debug查看梯度值
emb_layer.weight tensor([[ 1.8522e-03,  2.9960e-04,  3.0032e-04,  ..., -7.6257e-04,
          1.0018e-05, -9.0817e-04],
        [ 2.5770e-02,  8.0428e-03,  3.9451e-04,  ..., -5.5539e-03,
          3.3564e-03, -1.4061e-02],
        [ 2.8880e-04,  4.6715e-05,  4.6828e-05,  ..., -1.1890e-04,
          1.5621e-06, -1.4161e-04],
        ...,
        [ 1.5724e-03,  2.5435e-04,  2.5497e-04,  ..., -6.4740e-04,
          8.5053e-06, -7.7101e-04],
        [ 2.1395e-07,  3.4608e-08,  3.4692e-08,  ..., -8.8088e-08,
          1.1573e-09, -1.0491e-07],
        [ 1.4155e-04,  2.2896e-05,  2.2952e-05,  ..., -5.8278e-05,
          7.6563e-07, -6.9405e-05]])


In [20]:
param.grad.shape

torch.Size([500, 128])

# SkipGram

In [21]:
import time

import torch
import torch.nn  as nn
import torch.nn.functional as F

In [22]:
class SkipGram(nn.Module):
    def __init__(self, vocab_size, embedding_dim=128):
        super(SkipGram, self).__init__()
        self.emb_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim)
        self.output_layer = nn.Linear(in_features=embedding_dim, out_features=vocab_size)

    def forward(self, x):
        """
        前向过程
        :param x: [N,1] long
        :return:
        """
        z1 = self.emb_layer(x)  # [N,1] --> [N,1,embedding_dim]
        z2 = z1[:, 0, :]  # [N,1,embedding_dim] --> [N,embedding_dim]
        scores = self.output_layer(z2)  # [N,embedding_dim] --> [N,vocab_size]  得到的是每个样本对应各个单词类别的置信度
        return scores

In [23]:
vocab_size = 50000  # 词汇表大小，也就是单词类别数目
batch_size = 16
window_size = 4  # 一个老师教多个学生

net = SkipGram(vocab_size=vocab_size, embedding_dim=128)

x = torch.randint(vocab_size, size=(batch_size, 1), dtype=torch.long)  # [N,1]
y = torch.randint(vocab_size, size=(batch_size, window_size), dtype=torch.long)  # [N,T]

In [24]:
print (y[0])
for i in range(window_size):
    y[0, i] = 2 + i
    y[1, i] = 0 + i
print (f"y shape is {y.shape}")
print (y[0])

tensor([14688, 12791, 39657, 42172])
y shape is torch.Size([16, 4])
tensor([2, 3, 4, 5])


In [25]:
scores = net(x)  # [N,vocab_size] N个样本每个样本在vocab_size个类别上的置信度
print(scores.shape)

torch.Size([16, 50000])


## BCEWithLogitsLoss

y降维

In [26]:
loss_fn = nn.BCEWithLogitsLoss()
y_onehot = F.one_hot(y, vocab_size).to(torch.float32)  # [N, T, vocab_size] 把y标签转成one-hot编码形式
y_onehot_2 = torch.sum(y_onehot, dim=1)  # [N, T, vocab_size] -> [N, vocab_size] 将每个样本多个标签合并成一个向量
loss = loss_fn(scores, y_onehot_2)   # [N, vocab_size]
print(loss) 

tensor(0.7317, grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)


## sigmoid

In [27]:
ori_prob = torch.sigmoid(scores)  # [N, vocab_size]
loss3 = y_onehot_2 * torch.log(ori_prob) + (1 - y_onehot_2) * torch.log(1.0 - ori_prob)
print(loss3[0].detach().numpy()[:20])
loss3 = -torch.mean(loss3)
print(loss3)

[-0.675784   -1.052479   -0.41711083 -0.6477477  -0.9155155  -0.4729706
 -0.5049713  -0.5614689  -0.4660441  -0.7060538  -0.42633927 -1.081919
 -0.47972113 -1.0223006  -0.7881447  -0.46608683 -0.5866476  -1.1128715
 -0.9320852  -0.7289149 ]
tensor(0.7317, grad_fn=<NegBackward0>)


## unsqueeze & tile

x升维

In [28]:
prob = torch.unsqueeze(ori_prob, dim=1)  # [N, vocab_size] -> [N, 1, vocab_size] 增加维度
prob = torch.tile(prob, [1, y.shape[1], 1])  # [N, 1, vocab_size] -> [N, T, vocab_size]
loss2 = y_onehot * torch.log(prob) + (1 - y_onehot) * torch.log(1.0 - prob)  # [N,T,vocab_size] # 计算对数似然函数
loss2 = torch.mean(loss2, dim=1)  # [N,T,vocab_size] -> [N, vocab_size]
print(loss2[0].detach().numpy()[:20]) # detach从计算图中分离张量，numpy转化为数组
loss2 = -torch.mean(loss2)
print(loss2)

[-0.675784   -1.052479   -0.9110676  -0.7174666  -0.6123859  -0.8501705
 -0.5049713  -0.5614689  -0.4660441  -0.7060538  -0.42633927 -1.081919
 -0.47972113 -1.0223006  -0.7881447  -0.46608683 -0.5866476  -1.1128715
 -0.9320852  -0.7289149 ]
tensor(0.7317, grad_fn=<NegBackward0>)
