手动实现方式

In [2]:
import numpy as np
from collections import defaultdict
import random
def sigmoid(x):
    return 1/(1+np.exp(-x))

class Item2Vec:
    def __init__(self, item_size, embedding_dim, neg_samples=5):
        """
        初始化 Item2Vec 模型。

        参数:
        - item_size: 物品表的大小（词汇表大小）。
        - embedding_dim: 词向量的维度。
        - neg_samples: 负样本的数量。
        """
        self.item_size = item_size
        self.embedding_dim = embedding_dim
        self.neg_samples = neg_samples
        
        # 初始化中心物品向量和上下文物品向量
        self.center_embeddings = np.random.randn(item_size, embedding_dim) * 0.01
        self.context_embeddings = np.random.randn(item_size, embedding_dim) * 0.01
        
    def negative_sampling_loss(self, center_idx, context_idx, neg_sample_indices):
        """
        计算负采样的损失函数。

        参数:
        - center_idx: 中心物品的索引。
        - context_idx: 目标上下文物品的索引。
        - neg_sample_indices: 负样本物品的索引列表。

        返回:
        - loss: 负采样的损失值。
        - grad_center: 中心物品向量的梯度。
        - grad_context: 目标上下文物品向量的梯度。
        - grad_neg: 负样本物品向量的梯度。
        """
        # 获取向量
        v_c = self.center_embeddings[center_idx]  # 中心物品向量
        u_o = self.context_embeddings[context_idx]  # 目标上下文物品向量
        u_neg = self.context_embeddings[neg_sample_indices]  # 负样本物品向量
        
        # 正样本项
        pos_score = np.dot(u_o, v_c)
        pos_loss = -np.log(sigmoid(pos_score))
        
        # 负样本项
        neg_scores = np.dot(u_neg, v_c)
        neg_loss = -np.sum(np.log(sigmoid(-neg_scores)))
        
        # 总损失
        loss = pos_loss + neg_loss
        
        # 计算梯度
        grad_pos = sigmoid(pos_score) - 1  # 正样本的梯度
        grad_neg = sigmoid(neg_scores)    # 负样本的梯度
        
        # 更新梯度
        grad_center = grad_pos * u_o + np.sum(grad_neg.reshape(-1, 1) * u_neg, axis=0)
        grad_context = grad_pos * v_c
        grad_neg = grad_neg.reshape(-1, 1) * v_c
        
        return loss, grad_center, grad_context, grad_neg
    
    def train(self, item_pairs, epochs=10, learning_rate=0.01):
        """
        训练 Item2Vec 模型。

        参数:
        - item_pairs: 物品对列表，每个物品对是一个元组 (center_idx, context_idx)。
        - epochs: 训练轮数。
        - learning_rate: 学习率。
        """
        for epoch in range(epochs):
            total_loss = 0
            for center_idx, context_idx in item_pairs:
                # 随机采样负样本
                neg_sample_indices = np.random.choice(self.item_size, self.neg_samples, replace=False)
                
                # 计算损失和梯度
                loss, grad_center, grad_context, grad_neg = self.negative_sampling_loss(
                    center_idx, context_idx, neg_sample_indices
                )
                
                # 更新中心物品向量
                self.center_embeddings[center_idx] -= learning_rate * grad_center
                
                # 更新目标上下文物品向量
                self.context_embeddings[context_idx] -= learning_rate * grad_context
                
                # 更新负样本物品向量
                for i, idx in enumerate(neg_sample_indices):
                    self.context_embeddings[idx] -= learning_rate * grad_neg[i]
                
                # 累加损失
                total_loss += loss
            
            # 打印每轮的损失
            print(f"Epoch {epoch + 1}, Loss: {total_loss}")
    
    def get_item_embeddings(self):
        """
        获取物品的向量表示。

        返回:
        - item_embeddings: 物品的向量表示，形状为 (item_size, embedding_dim)。
        """
        return self.center_embeddings

In [7]:
# 假设物品表的大小为 5
item_size = 5

# 假设物品对列表
item_pairs = [
    (0, 1),  # 物品 0 和物品 1 共现
    (0, 2),  # 物品 0 和物品 2 共现
    (1, 2),  # 物品 1 和物品 2 共现
    (2, 3),  # 物品 2 和物品 3 共现
    (3, 4)   # 物品 3 和物品 4 共现
]
model=Item2Vec(item_size,3,2)
model.train(item_pairs, epochs=10, learning_rate=0.03)
model.get_item_embeddings()

Epoch 1, Loss: 10.397745936735637
Epoch 2, Loss: 10.397785586755447
Epoch 3, Loss: 10.397789086255093
Epoch 4, Loss: 10.397140498903765
Epoch 5, Loss: 10.397580954268332
Epoch 6, Loss: 10.397638121158959
Epoch 7, Loss: 10.397695438841511
Epoch 8, Loss: 10.3973008814061
Epoch 9, Loss: 10.397416728367025
Epoch 10, Loss: 10.3970063986563


array([[ 0.00693019,  0.00060289, -0.02170743],
       [ 0.01652888,  0.00412437,  0.0002566 ],
       [-0.01116124, -0.01332822, -0.00472743],
       [-0.01387696,  0.00951275, -0.00366075],
       [-0.00297341,  0.00662084,  0.00080711]])

pytorch实现方式：
- 使用 PyTorch 实现 Item2Vec 模型，并利用 PyTorch 的自动微分功能来计算梯度。

In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
class Item2Vec(nn.Module):
    def __init__(self, item_size, embedding_dim):
        """
        初始化 Item2Vec 模型。

        参数:
        - item_size: 物品表的大小（词汇表大小）
        - embedding_dim: 词向量的维度。
        """
        super(Item2Vec, self).__init__()
        self.item_size = item_size
        self.embedding_dim = embedding_dim
        
        # 定义中心物品和上下文物品的嵌入层
        self.center_embeddings = nn.Embedding(item_size, embedding_dim)
        self.context_embeddings = nn.Embedding(item_size, embedding_dim)
        
    def forward(self, center_idx, context_idx, neg_sample_indices):
        """
        前向传播。

        参数:
        - center_idx: 中心物品的索引。
        - context_idx: 目标上下文物品的索引。
        - neg_sample_indices: 负样本物品的索引列表。

        返回:
        - loss: 负采样的损失值。
        """
        # 获取中心物品和上下文物品的嵌入向量
        v_c = self.center_embeddings(center_idx)  # 中心物品向量
        u_o = self.context_embeddings(context_idx)  # 目标上下文物品向量
        u_neg = self.context_embeddings(neg_sample_indices)  # 负样本物品向量
        
        # 正样本项
        pos_score = torch.matmul(u_o, v_c.t())
        pos_loss = -torch.log(torch.sigmoid(pos_score))
        
        # 负样本项
        neg_scores = torch.matmul(u_neg, v_c.t())
        neg_loss = -torch.sum(torch.log(torch.sigmoid(-neg_scores)))
        
        # 总损失
        loss = pos_loss + neg_loss
        return loss

In [18]:
# 假设物品表的大小为 5
item_size = 5

# 假设词向量维度为 3
embedding_dim = 3

# 初始化模型
model = Item2Vec(item_size, embedding_dim)

# 定义优化器
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 假设物品对列表
item_pairs = [
    (0, 1),  # 物品 0 和物品 1 共现
    (0, 2),  # 物品 0 和物品 2 共现
    (1, 2),  # 物品 1 和物品 2 共现
    (2, 3),  # 物品 2 和物品 3 共现
    (3, 4)   # 物品 3 和物品 4 共现
]

# 训练模型
epochs = 10
for epoch in range(epochs):
    total_loss = 0
    for center_idx, context_idx in item_pairs:
        # 随机采样负样本
        neg_sample_indices = np.random.choice(item_size, 2, replace=False)
        neg_sample_indices = torch.tensor(neg_sample_indices, dtype=torch.long)
        
        # 将索引转换为 PyTorch 张量
        center_idx = torch.tensor(center_idx, dtype=torch.long)
        context_idx = torch.tensor(context_idx, dtype=torch.long)
        
        # 清零梯度
        optimizer.zero_grad()
        
        # 前向传播
        loss = model(center_idx, context_idx, neg_sample_indices)
        
        # 反向传播
        loss.backward()
        
        # 更新参数
        optimizer.step()
        
        # 累加损失
        total_loss += loss.item()
    
    # 打印每轮的损失
    print(f"Epoch {epoch + 1}, Loss: {total_loss}")

Epoch 1, Loss: 16.910038948059082
Epoch 2, Loss: 11.8334242105484
Epoch 3, Loss: 15.881965637207031
Epoch 4, Loss: 13.787595987319946
Epoch 5, Loss: 11.726338863372803
Epoch 6, Loss: 13.863561511039734
Epoch 7, Loss: 12.330938816070557
Epoch 8, Loss: 10.64801561832428
Epoch 9, Loss: 12.697281002998352
Epoch 10, Loss: 13.352895379066467


In [19]:
# 获取物品的向量表示
item_embeddings = model.center_embeddings.weight.data
print("物品的向量表示:")
print(item_embeddings)

物品的向量表示:
tensor([[-0.8770,  1.8270, -0.0632],
        [ 1.0472,  0.2641, -1.1805],
        [-0.1065,  2.0799, -2.3516],
        [ 0.3148, -0.5068, -1.4442],
        [-0.1261,  0.0168,  2.0256]])


# 小知识点

1. 关于pytorch的backword

PyTorch 采用动态计算图来跟踪所有涉及 `requires_grad=True` 的张量上的操作。在进行前向传播时，PyTorch 会记录每一个操作，构建一个计算图。当调用 `backward()` 方法时，PyTorch 会从损失值这个**标量**开始，沿着计算图反向传播，根据链式法则自动计算每个可学习参数（即 `requires_grad=True` 的张量）关于损失的梯度。

In [24]:
import torch

# 创建一个需要求导的张量
x = torch.tensor([2.0], requires_grad=True)
# 定义一个简单的函数
y = x ** 2
# 计算损失（这里简单将 y 作为损失）
loss = y
# 调用 backward 方法计算梯度
loss.backward()
# 查看梯度
print(x.data,x.grad)  

tensor([2.]) tensor([4.])
