In [1]:
# ----------------------
# 步骤1：文本清洗与分词
# ----------------------

In [2]:
import numpy as np

# 语料库路径
corpus_path = r'C:\Users\20030421a\Desktop\text8.txt'

# 参数设置
min_count = 5  # 低频词过滤阈值

# 初始化变量
word_freq = {}  # 统计每个词的频率
id2word = []    # 索引到词的映射，如 [UNK, "he", "is", ...]
word2id = {}    # 词到索引的映射，如 {"he":1, "is":2, ...}

def preprocess_text(file_path):
    """
    读取语料库文件，清洗文本并分词
    :param file_path: 语料库文件路径
    :return: 分词后的词列表
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read().strip()  # 读取文件内容并去除首尾空白
    # 分词（英文按空格分割）
    tokenized_text = text.split()
    # 选取前 1% 的数据
    tokenized_text = tokenized_text[:int(len(tokenized_text) * 0.01)]
    # 转为小写
    tokenized_text = [word.lower() for word in tokenized_text]
    return tokenized_text

In [3]:
import time
# 记录代码开始执行的时间
start_time = time.time()

In [4]:
# 调用函数，获取分词后的词列表
tokenized_text = preprocess_text(corpus_path)#列表
print(f"分词后的词列表（前10个）：{tokenized_text[:10]}")

分词后的词列表（前10个）：['anarchism', 'originated', 'as', 'a', 'term', 'of', 'abuse', 'first', 'used', 'against']


In [9]:
# ----------------------
# 步骤2：统计词频并过滤低频词
# ----------------------

In [5]:
def build_word_freq(tokenized_text):
    """
    统计词频并过滤低频词
    :param tokenized_text: 分词后的词列表
    :return: 过滤后的词频字典
    """
    word_freq = {}
    for word in tokenized_text:
        word_freq[word] = word_freq.get(word, 0) + 1
    # 过滤低频词
    word_freq = {word: freq for word, freq in word_freq.items() if freq >= min_count}
    return word_freq

In [6]:
# 调用函数，获取词频字典
word_freq = build_word_freq(tokenized_text)
print(f"词频字典（前10项）：{list(word_freq.items())[:10]}")

词频字典（前10项）：[('anarchism', 104), ('originated', 9), ('as', 1414), ('a', 3074), ('term', 68), ('of', 6071), ('abuse', 14), ('first', 237), ('used', 190), ('against', 77)]


In [12]:
# ----------------------
# 步骤3：构建词汇表
# ----------------------

In [13]:
def build_vocab(word_freq):
    """
    构建词汇表（词到ID的映射和ID到词的映射）
    :param word_freq: 词频字典
    :return: id2word, word2id
    """
    id2word = ["[UNK]"]  # ID=0 保留给未知词
    word2id = {"[UNK]": 0}
    idx = 1
    for word, freq in word_freq.items():
        word2id[word] = idx
        id2word.append(word)
        idx += 1
    return id2word, word2id

In [14]:
# 调用函数，获取词汇表
id2word, word2id = build_vocab(word_freq)

In [16]:
#高频词采样
def subsample_words(tokenized_text, t=1e-5):
    new_text = []
    for word in tokenized_text:
        freq = word_freq.get(word, 0)
        prob = 1 - np.sqrt(t / (freq + 1e-5))  # 防止除以0
        if np.random.rand() > prob:  # 以概率prob保留该词
            new_text.append(word)
    return new_text

In [17]:
window_size = 5         # 最大窗口
train_data = []         # 存储 (中心词id, [上下文id列表])

In [18]:
for i in range(len(tokenized_text)):
    center_word = tokenized_text[i]
    center_id = word2id.get(center_word, 0)  # 未知词映射为0
    if center_id == 0:  # 跳过未知词作为中心词
        continue
    # 随机生成当前窗口大小
    curr_window = np.random.randint(1, window_size+1)
    context_ids = []
    # 取左右各curr_window个词
    for j in range(max(0, i - curr_window), min(len(tokenized_text), i + curr_window + 1)):
        if j != i:  # 排除中心词自己
            context_word = tokenized_text[j]
            context_id = word2id.get(context_word, 0)
            if context_id != 0:  # 忽略未知词
                context_ids.append(context_id)
    if len(context_ids) > 0:
        train_data.append( (center_id, context_ids) )

In [20]:
# 将每个上下文词单独拆分为正样本
expanded_train_data = []
for center_id, context_ids in train_data:
    for context_id in context_ids:
        expanded_train_data.append( (center_id, context_id) )

In [22]:
# 根据词频计算负采样概率分布（幂律分布，论文中建议使用3/4次方）
word_counts = np.array([word_freq.get(word, 0) for word in id2word], dtype=np.float32)
word_counts[0] = 0  # 排除未知词[UNK]
word_probs = word_counts ** 0.75
word_probs /= word_probs.sum()  # 归一化为概率分布

In [23]:
def get_negative_samples(context_id, num_neg_samples):
    """
    根据概率分布生成负样本（排除正样本词）
    :param context_id: 正样本的上下文词ID（避免采样到该词）
    :param num_neg_samples: 负样本数量
    :return: 负样本ID列表
    """
    neg_samples = []
    while len(neg_samples) < num_neg_samples:
        # 按分布采样，排除context_id
        sampled_id = np.random.choice(len(word_probs), p=word_probs)
        if sampled_id != context_id and sampled_id != 0:  # 排除未知词和正样本
            neg_samples.append(sampled_id)
    return neg_samples

In [24]:
# CBOW + Negative Sampling
vocab_size = len(id2word)   # 词汇表大小
embedding_dim = 100         # 词向量维度
neg_samples = 5             # 每个正样本对应的负样本数
# 输入向量（上下文词）和输出向量（中心词）
input_embeddings = np.random.randn(vocab_size, embedding_dim) * 0.01  # 上下文词向量矩阵
output_embeddings = np.random.randn(vocab_size, embedding_dim) * 0.01 # 中心词向量矩阵


In [25]:
def cbow_loss(center_id, context_id, neg_samples):
    """
    计算CBOW模型的负采样损失
    :param center_id: 中心词ID
    :param context_id: 上下文词ID（正样本）
    :param neg_samples: 负样本ID列表
    :return: 损失值，中心词梯度，上下文词梯度，负样本梯度
    """
    # 获取输入向量（上下文词向量）
    h = input_embeddings[context_id]  # [embedding_dim, ]
    
    # 正样本得分（中心词向量）
    pos_out = output_embeddings[center_id]  # [embedding_dim, ]
    pos_score = 1 / (1 + np.exp(-np.dot(pos_out, h)))  # Sigmoid(点积)
    pos_loss = -np.log(pos_score)  # 正样本损失
    
    # 负样本得分
    neg_loss = 0
    neg_grads = []
    for neg_id in neg_samples:
        neg_out = output_embeddings[neg_id]
        neg_score = 1 / (1 + np.exp(np.dot(neg_out, h)))  # Sigmoid(-点积)
        neg_loss += -np.log(neg_score)
        neg_grads.append(neg_out * (neg_score - 1))  # 负样本梯度
    
    total_loss = pos_loss + neg_loss
    
    # 计算梯度
    # 正样本梯度
    grad_pos_out = (pos_score - 1) * h  # dL/d(pos_out)
    grad_h_pos = (pos_score - 1) * pos_out  # dL/dh（来自正样本）
    
    # 负样本梯度（累加到h的梯度）
    grad_h_neg = np.zeros_like(h)
    for neg_grad in neg_grads:
        grad_h_neg += neg_grad
    grad_h = grad_h_pos + grad_h_neg  # 总梯度
    
    return total_loss, grad_h, grad_pos_out, neg_grads

In [26]:
learning_rate = 0.02
num_epochs = 5
neg_samples_per_pos = 5  # 每个正样本对应的负样本数
batch_size = 128  # 小批量训练（可选）

In [27]:
import numpy as np

# 数值稳定函数定义
def safe_sigmoid(x):
    """数值稳定的sigmoid函数"""
    x_clipped = np.clip(x, -100, 100)  # 防止exp溢出
    return 1 / (1 + np.exp(-x_clipped))

def safe_log(x, eps=1e-8):
    """数值稳定的对数函数"""
    return np.log(np.clip(x, eps, 1.0 - eps))

# 修改后的损失函数
def cbow_loss(center_id, context_id, neg_samples):
    # 获取输入向量（上下文词向量）
    h = input_embeddings[context_id]  # [embedding_dim, ]
    
    # 正样本得分（中心词向量）
    pos_out = output_embeddings[center_id]  # [embedding_dim, ]
    dot_pos = np.dot(pos_out, h)
    pos_score = safe_sigmoid(dot_pos)  # 使用安全sigmoid
    pos_loss = -safe_log(pos_score)     # 使用安全log
    
    # 负样本得分
    neg_loss = 0
    neg_grads = []
    for neg_id in neg_samples:
        neg_out = output_embeddings[neg_id]
        dot_neg = np.dot(neg_out, h)
        neg_score = safe_sigmoid(-dot_neg)  # 注意负号
        neg_loss += -safe_log(neg_score)
        # 计算梯度时保持数值稳定
        grad_factor = (neg_score - 1) * h
        grad_factor = np.clip(grad_factor, -5.0, 5.0)  # 梯度裁剪
        neg_grads.append(grad_factor)
    
    total_loss = pos_loss + neg_loss
    
    # 计算梯度（加入梯度裁剪）
    grad_pos_out = np.clip((pos_score - 1) * h, -5.0, 5.0)
    grad_h_pos = np.clip((pos_score - 1) * pos_out, -5.0, 5.0)
    
    grad_h_neg = np.zeros_like(h)
    for grad in neg_grads:
        grad_h_neg += grad
    grad_h = grad_h_pos + grad_h_neg
    
    return total_loss, grad_h, grad_pos_out, neg_grads

# 训练循环修改版
for epoch in range(num_epochs):
    np.random.shuffle(expanded_train_data)
    total_loss = 0
    samples_processed = 0
    
    # 加入进度打印
    print_interval = max(1, len(expanded_train_data) // 10)
    
    for idx, (center_id, context_id) in enumerate(expanded_train_data):
        # 生成负样本
        neg_samples = get_negative_samples(context_id, neg_samples_per_pos)
        
        # 计算损失和梯度
        loss, grad_h, grad_pos_out, neg_grads = cbow_loss(center_id, context_id, neg_samples)
        total_loss += loss
        
        # 梯度裁剪
        grad_h = np.clip(grad_h, -5.0, 5.0)
        grad_pos_out = np.clip(grad_pos_out, -5.0, 5.0)
        neg_grads = [np.clip(g, -5.0, 5.0) for g in neg_grads]
        
        # 更新参数
        input_embeddings[context_id] -= learning_rate * grad_h
        output_embeddings[center_id] -= learning_rate * grad_pos_out
        for i, neg_id in enumerate(neg_samples):
            output_embeddings[neg_id] -= learning_rate * neg_grads[i]
        
        # 进度打印
        samples_processed += 1
        if (idx + 1) % print_interval == 0:
            avg_loss = total_loss / samples_processed
            print(f"Epoch {epoch} | Progress: {(idx+1)/len(expanded_train_data)*100:.1f}% | Current Loss: {avg_loss:.4f}")
    
    # 学习率衰减调整
    learning_rate = max(0.0001, learning_rate * 0.95)  # 更激进的衰减
    
    # 每个epoch结束后打印
    avg_loss = total_loss / len(expanded_train_data)
    print(f"Epoch {epoch} Completed | Avg Loss: {avg_loss:.4f} | Learning Rate: {learning_rate:.6f}")
    print("=" * 60)
    
    # 每个epoch保存检查点
    np.save(f"word_vectors_epoch{epoch}.npy", input_embeddings)

Epoch 0 | Progress: 10.0% | Current Loss: 59.7244
Epoch 0 | Progress: 20.0% | Current Loss: 73.4698
Epoch 0 | Progress: 30.0% | Current Loss: 79.3511
Epoch 0 | Progress: 40.0% | Current Loss: 82.4859
Epoch 0 | Progress: 50.0% | Current Loss: 84.3998
Epoch 0 | Progress: 60.0% | Current Loss: 85.6818
Epoch 0 | Progress: 70.0% | Current Loss: 86.5989
Epoch 0 | Progress: 80.0% | Current Loss: 87.2870
Epoch 0 | Progress: 90.0% | Current Loss: 87.8221
Epoch 0 | Progress: 100.0% | Current Loss: 88.2503
Epoch 0 Completed | Avg Loss: 88.2503 | Learning Rate: 0.019000
Epoch 1 | Progress: 10.0% | Current Loss: 92.1034
Epoch 1 | Progress: 20.0% | Current Loss: 92.1034
Epoch 1 | Progress: 30.0% | Current Loss: 92.1034
Epoch 1 | Progress: 40.0% | Current Loss: 92.1034
Epoch 1 | Progress: 50.0% | Current Loss: 92.1034
Epoch 1 | Progress: 60.0% | Current Loss: 92.1034
Epoch 1 | Progress: 70.0% | Current Loss: 92.1034
Epoch 1 | Progress: 80.0% | Current Loss: 92.1034
Epoch 1 | Progress: 90.0% | Current

In [28]:
# 保存输入向量（上下文词向量）或输出向量（通常使用输入向量）
np.save("word_vectors.npy", input_embeddings)

In [29]:
# 记录代码结束执行的时间
end_time = time.time()

In [37]:
end_time

1739369273.8297653

In [30]:
def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

def find_most_similar(word, top_k=5):
    if word not in word2id:
        return []
    word_id = word2id[word]
    word_vec = input_embeddings[word_id]
    similarities = []
    for idx, w in enumerate(id2word):
        if idx == 0 or idx == word_id:
            continue
        sim = cosine_similarity(word_vec, input_embeddings[idx])
        similarities.append( (w, sim) )
    similarities.sort(key=lambda x: -x[1])
    return similarities[:top_k]

# 测试相似词
test_word = "king"
similar_words = find_most_similar(test_word)
print(f"与 '{test_word}' 最相似的词：")
for word, sim in similar_words:
    print(f"{word}: {sim:.3f}")

与 'king' 最相似的词：
indeed: 0.922
hector: 0.906
origin: 0.903
rich: 0.890
differences: 0.885


In [31]:
def analogy(a, b, c, top_k=3):
    # 计算 vec(a) - vec(b) + vec(c)
    a_id = word2id.get(a, 0)
    b_id = word2id.get(b, 0)
    c_id = word2id.get(c, 0)
    if a_id == 0 or b_id == 0 or c_id == 0:
        return []
    vec = input_embeddings[a_id] - input_embeddings[b_id] + input_embeddings[c_id]
    similarities = []
    for idx, w in enumerate(id2word):
        if idx == 0 or idx in [a_id, b_id, c_id]:
            continue
        sim = cosine_similarity(vec, input_embeddings[idx])
        similarities.append( (w, sim) )
    similarities.sort(key=lambda x: -x[1])
    return similarities[:top_k]

# 测试类比任务（king - man + woman = queen）
result = analogy("man", "woman", "king")
print("类比任务结果：man : woman :: king : ?")
for word, sim in result:
    print(f"{word}: {sim:.3f}")

类比任务结果：man : woman :: king : ?
verbal: 0.902
africans: 0.875
leader: 0.873
