In [1]:
import requests
import zipfile
import os
import numpy as np
from collections import Counter
from itertools import chain
from gensim.models import KeyedVectors
# 下载 Text8 数据集
def download_text8(data_dir='data'):
    url = 'http://mattmahoney.net/dc/text8.zip'
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    zip_path = os.path.join(data_dir, 'text8.zip')
    if not os.path.exists(zip_path):
        print("Downloading text8 dataset...")
        response = requests.get(url)
        with open(zip_path, 'wb') as f:
            f.write(response.content)
        print("Download complete.")
    else:
        print("Text8 dataset already downloaded.")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(data_dir)
    print("Text8 dataset extracted.")

# 保存词嵌入到文件
def save_embeddings(word_embeddings, word_to_idx, vocab_size, embed_size, file_name):
    with open(file_name, 'w') as f:
        f.writelines(f"{vocab_size} {embed_size}\n")
        for word, idx in word_to_idx.items():
            embedding = ' '.join(map(str, word_embeddings[idx]))
            f.write(f"{word} {embedding}\n")

# 构建词汇表和词索引映射
def build_vocab(text, vocab_size=10000):
    words = text.split()
    vocab = [word for word, _ in Counter(words).most_common(vocab_size)]
    
    word_to_idx = {word: idx for idx, word in enumerate(vocab)}
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    return word_to_idx, idx_to_word

# 转换文本到索引
def text_to_indices(text, word_to_idx):
    return [word_to_idx[word] for word in text.split() if word in word_to_idx]

# 生成 Skip-gram 训练数据
def generate_skipgram_data(indices, window_size=2):
    pairs = []
    for i in range(len(indices)):
        target = indices[i]
        context_indices = list(range(max(0, i - window_size), i)) + list(range(i + 1, min(len(indices), i + window_size + 1)))
        for j in context_indices:
            context = indices[j]
            pairs.append((target, context))
    return pairs

def softmax(x):
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum(axis=0)


class CBOW: #CBOW模型
    def __init__(self, vocab_size, embed_size, learning_rate=0.01):
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.learning_rate = learning_rate
        self.W1 = np.random.randn(vocab_size, embed_size)
        self.W2 = np.random.randn(embed_size, vocab_size)
    
    def forward(self, context):
        self.context_one_hot = np.zeros(self.vocab_size)
        self.context_one_hot[context] = 1
        h = np.dot(self.W1.T, self.context_one_hot)
        u = np.dot(self.W2.T, h)
        y_pred = softmax(u)
        return y_pred, h, u
    
    def backward(self, context, target, y_pred, h, learning_rate):
        dL_du = y_pred
        dL_du[target] -= 1
        
        dL_dW2 = np.outer(h, dL_du)
        dW1 = np.outer(self.context_one_hot, np.dot(self.W2, dL_du.T))
        
        self.W2 -= learning_rate * dL_dW2
        
        self.W1 -= self.learning_rate * dW1
    
    def train(self, pairs, epochs=5):
        for epoch in range(epochs):
            total_loss = 0
            for target, context in pairs:
                y_pred, h, u = self.forward(context)
                loss = -np.log(y_pred[target])
                total_loss += loss
                self.backward(context, target, y_pred, h, self.learning_rate)
            print(f'cbow Epoch {epoch + 1}, Loss: {total_loss/10000}')

    def get_embedding(self, word_idx):
        return self.W1[word_idx]
    
    def save_embeddings(self, file_name):
        np.save(file_name, self.W1)
    
    def load_embeddings(self, file_name):
        self.W1 = np.load(file_name)
    


class SkipGram:
    def __init__(self, vocab_size, embed_size, learning_rate=0.01):
        self.vocab_size = vocab_size
        self.embed_size = embed_size
        self.learning_rate = learning_rate
        self.W1 = np.random.randn(vocab_size, embed_size)
        self.W2 = np.random.randn(embed_size, vocab_size)
        
    
    def train(self, pairs, epochs=5, batch_size=1000):
        for epoch in range(epochs):
            np.random.shuffle(pairs)
            loss = 0
            for i in range(0, len(pairs), batch_size):
                
                batch_pairs = pairs[i:i + batch_size]
                batch_loss = self.train_batch(batch_pairs)
                loss += batch_loss
                
                if (i // batch_size) % 10 == 0:
                    print(f'SkipGram Batch {i // batch_size + 1}, Loss: {batch_loss/1000}')
            print(f'SkipGram Epoch: {epoch}, Loss: {loss/10000}')
    
    def train_batch(self, batch_pairs):
        batch_loss = 0
        for target, context in batch_pairs:
            target_one_hot = np.zeros(self.vocab_size)
            target_one_hot[target] = 1
            
            h = np.dot(self.W1.T, target_one_hot)
            u = np.dot(self.W2.T, h)
            y_hat = softmax(u) #预测值
            
            e = y_hat.copy()
            e[context] -= 1 # 梯度
            
            dW2 = np.outer(h, e)
            dW1 = np.outer(target_one_hot, np.dot(self.W2, e.T))
            
            self.W1 -= self.learning_rate * dW1
            self.W2 -= self.learning_rate * dW2
            
            batch_loss += -np.log(y_hat[context])
        return batch_loss
    
    def get_embedding(self):
        return self.W1
    
    def save_embeddings(self, file_name):
        np.save(file_name, self.W1)
    
    def load_embeddings(self, file_name):
        self.W1 = np.load(file_name)




# 下载并预处理数据
download_text8()
with open('data/text8', 'r') as f:
    text = f.read()



vocab_size = 10000
word_to_idx, idx_to_word = build_vocab(text, vocab_size) #构建词汇表
indices = text_to_indices(text, word_to_idx)  #文本转索引
pairs = generate_skipgram_data(indices[:10000])

print(f"Vocabulary size: {vocab_size}")
print(f"Number of training pairs: {len(pairs)}")


# 训练 模型
embed_size = 100
learning_rate = 0.01
epochs = 10
batch_size = 1000

cbow = CBOW(vocab_size, embed_size, learning_rate) #训练cbow模型
cbow.train(pairs, epochs=epochs)

skip_gram = SkipGram(vocab_size, embed_size, learning_rate) #训练skipGram模型
skip_gram.train(pairs, epochs=epochs, batch_size=batch_size)

word_embeddings = skip_gram.get_embedding() #获取词嵌入w1
save_embeddings(word_embeddings, word_to_idx, vocab_size, embed_size,'word_embeddings.txt')  #保存嵌入

loaded_embeddings = KeyedVectors.load_word2vec_format('word_embeddings.txt', binary=False) #加载嵌入

# 获取某个词的嵌入
print("is的词向量:", loaded_embeddings['is'])



Text8 dataset already downloaded.
Text8 dataset extracted.
Vocabulary size: 10000
Number of training pairs: 39994
cbow Epoch 1, Loss: 99.85436055895724
cbow Epoch 2, Loss: 70.25942900866258
cbow Epoch 3, Loss: 57.58860725396015
cbow Epoch 4, Loss: 49.53454342138034
cbow Epoch 5, Loss: 43.82359591462081
cbow Epoch 6, Loss: 39.5166662275801
cbow Epoch 7, Loss: 36.137674544674006
cbow Epoch 8, Loss: 33.40568355017717
cbow Epoch 9, Loss: 31.146844948857957
cbow Epoch 10, Loss: 29.253593067845692
SkipGram Batch 1, Loss: 36.43420736401264
SkipGram Batch 11, Loss: 26.990795532322693
SkipGram Batch 21, Loss: 23.504581049992495
SkipGram Batch 31, Loss: 21.040331318632532
SkipGram Epoch: 0, Loss: 100.04570930013863
SkipGram Batch 1, Loss: 19.04839011856509
SkipGram Batch 11, Loss: 17.941533480461324
SkipGram Batch 21, Loss: 17.97567983730538
SkipGram Batch 31, Loss: 17.4098148953555
SkipGram Epoch: 1, Loss: 70.29445440020517
SkipGram Batch 1, Loss: 14.844484805978466
SkipGram Batch 11, Loss: 15.