In [2]:

from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, TensorDataset


In [3]:
def init_weights(model):
    for name, param in model.named_parameters():
        if "embedding" not in name:
            torch.nn.init.uniform_(
                param, a=-0.1, b=0.1)


In [4]:
# 选出前1000大的词
def find_max1000():
    f = open('ChineseCorpus199801.txt', encoding = 'gbk')
    f_list = f.read().strip('\n').split()
    # for i in range(0,100):
    #     print(f_list[i])
    count = {}
    for word in f_list:
        if word in count:
            count[word] = count[word]+1
        else:
            count[word] = 1
    sort=sorted(count.items(), key=lambda item:item[1],reverse=True)
    max_words = []
    for i in range(0,999):
        max_words.append(sort[i][0])
    # print(max_words[50])
    f.close
    return max_words


max_words=find_max1000()
f1 = open('ChineseCorpus199801.txt', encoding = 'gbk')


processed_text = []


with open('1998.txt','w') as f2:
    for line in f1:
        words = line.split()
        new_line = ' '.join(words[1:])
        # print(new_line)
        processed_text.append('<START>')
        for word in new_line.split():
            if word not in max_words:
                processed_text.append("<UNK>")
            else:
                processed_text.append(word)
        processed_text.append('<END>')
        f2.write(" ".join(processed_text))
        f2.write("\n")
        processed_text = []
f2.close()
f1.close()



In [5]:
import random

def split_data(text, train_ratio=0.9, random_seed=42):
    """将语料库分成训练集和测试集"""
    random.seed(random_seed)
    data = text.split('\n')
    random.shuffle(data)
    split_idx = int(len(data) * train_ratio)
    train_data = data[:split_idx]
    test_data = data[split_idx:]
    return train_data, test_data

with open('1998.txt', 'r', encoding='gbk') as f:
    text = f.read()
    train_data, test_data = split_data(text)

with open('train.txt', 'w', encoding='gbk') as f:
    f.write('\n'.join(train_data))
    
with open('test.txt', 'w', encoding='gbk') as f:
    f.write('\n'.join(test_data))


In [13]:
# 定义自定义数据集
# f=open('result.txt', 'w', encoding='utf-8')
class TextDataset(Dataset):
    def __init__(self, file_path,context_size=2):
        self.vocab = find_max1000()
        self.vocab.append('<UNK>')
        self.vocab.append('<START>')
        self.vocab.append('<END>')
        self.word_to_idx = {word: i for i, word in enumerate(self.vocab)}
        self.data = []
        f = open(file_path, 'r', encoding='gbk')
        for sentence in f:
            split_sentence = sentence.split()
            if len(split_sentence)<context_size:
                continue
            # print(split_sentence)
            for i in range(context_size,len(split_sentence)):
                context = []
                for j in range(0,context_size):
                    # print(split_sentence[i-context_size+j],' ')
                    # print(self.word_to_idx[split_sentence[i-context_size+j]],'\n')
                    context.append(self.word_to_idx[split_sentence[i-context_size+j]])
                target = self.word_to_idx[split_sentence[i]]
                self.data.append((context,target))
        
    def __len__(self):
        return len(self.data)   
    
    def __getitem__(self, i):
        return self.data[i]
    
    def collate_fn(self,examples):
        # 从独立样本集合中构建批次的输入输出，并转换为PyTorch张量类型
        inputs = torch.tensor([ex[0] for ex in examples], dtype=torch.long)
        targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
        return (inputs, targets)

    
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, context_size, hidden_dim):
        super(LSTM, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim * context_size, hidden_dim, batch_first=True)
        self.linear = nn.Linear(hidden_dim, vocab_size)
        self.activate = nn.Tanh()
        self.hidden_dim = hidden_dim
        init_weights(self)

    def forward(self, inputs):
        batch_size = inputs.shape[0]
        # f.write(str(batch_size)+'\n\n')
        # 将输入词序列隐射为词向量，并通过view函数对映射后的词向量序列组成的三维张量进行重构，以完成词向量的拼接
        embeds = self.embeddings(inputs)
        # print(embeds.size())
        #.view((batch_size, -1))
        # f.write(str(embeds)+'\n\n')
        # 将词向量序列进行reshape，以将前context_size个词向量组成的张量表示成一个batch
        lstm_input = embeds.view((batch_size, -1, embedding_dim*context_size))
        # f.write(str(embeds)+'\n\n')
        hidden = (torch.zeros(1, batch_size, self.hidden_dim), 
                  torch.zeros(1, batch_size, self.hidden_dim))
        # print(hidden_dim.size())
        # f.write(str(hidden)+'\n\n')
        # print(hidden)
        # 将词向量组成的张量输入到LSTM模型中，并将模型的输出进行线性变换得到模型的输出层
        lstm_out, hidden = self.lstm(lstm_input, hidden)
        # f.write(str(lstm_out)+'\n\n')
        output = self.linear(lstm_out[:, -1, :])
        # f.write(str(output)+'\n\n')
        # 根据输出层（logits）计算概率分布并取对数，以便于计算对数似然，这里采用的是Pytorch库的log_softmax实现
        log_probs = F.log_softmax(output, dim=1)
        return log_probs
    
# 定义超参数
vocab_size = 1002
learning_rate = 0.001
embedding_dim = 64
context_size = 2
hidden_dim = 128
batch_size = 64 ##??
num_epoch = 10

# 加载数据
trainset = TextDataset('train.txt')
train_loader = DataLoader(trainset, batch_size=batch_size, collate_fn=trainset.collate_fn, shuffle=True)
testset = TextDataset('test.txt')
test_loader = DataLoader(testset, batch_size=batch_size, collate_fn=testset.collate_fn, shuffle=True)
# print(len(train_loader)," ",len(test_loader))
# 初始化模型、损失函数和优化器
nll_loss = nn.NLLLoss()
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = LSTM(vocab_size, embedding_dim, context_size, hidden_dim)
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 训练模型
model.train()
total_losses = []
for epoch in range(num_epoch):
    total_loss = 0
    # print(epoch)
    for batch in tqdm(train_loader, desc=f"Training Epoch{epoch}"):
        inputs, targets = [x.to(device) for x in batch]
        # f.write(str(inputs)+'\n\n')
        # f.write(str(targets)+'\n\n')
        optimizer.zero_grad()
        log_probs = model(inputs)
        # f.write(str(log_probs)+'\n\n')
        loss = nll_loss(log_probs, targets)
        # f.write(str(loss)+'\n\n')
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Loss: {total_loss:.2f} \n")
    total_losses.append(total_loss)
# f.close()


Training Epoch0:   0%|          | 0/15823 [00:00<?, ?it/s]

Loss: 57532.82 



In [14]:
# 保存词向量
word_vectors = model.embeddings.weight.detach().numpy()
print(word_vectors)


[[ 0.8005089   0.7636297   1.3433093  ...  0.12945569 -0.81985676
   0.47024605]
 [-0.06748337  1.8085841   0.12195534 ... -0.8769795   0.62915456
   0.56159675]
 [-0.67598194  0.3784667   0.02282851 ...  1.4043441   0.81880236
   0.9835822 ]
 ...
 [ 0.382231   -0.0586349   0.2640177  ...  0.42253777 -0.15539789
   0.5761976 ]
 [ 0.5990552  -1.0728542   0.03272067 ... -0.2821767  -0.22998019
  -0.55269676]
 [-1.5738677  -0.28990644  0.8938042  ... -0.34965748  0.13180919
   0.08301569]]


In [15]:


# 在测试集上验证词向量的性能,以便横向进行比较
with torch.no_grad():
    total_loss = 0
    for batch in tqdm(test_loader, desc=f"Testing Epoch1"):
        inputs, targets = [x.to(device) for x in batch]
        log_probs = model(inputs)
        loss = nll_loss(log_probs, targets)
        total_loss += loss.item()
    avg_loss = total_loss / len(test_loader)
    print("Average loss on test data: ", avg_loss)


Testing Epoch1:   0%|          | 0/1701 [00:00<?, ?it/s]

Average loss on test data:  3.436887391099924


In [9]:
# 最相近的十个词
import numpy as np
import random
# 计算余弦相似度
f1=open('similarity_lstm.txt','w',encoding='utf-8')
def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
random_number = random.sample(range(0, 998),20)
for i in range(0,20):
    random_vector = word_vectors[random_number[i]]
    similarity = []
    for j in range(0,998):
        v = word_vectors[j]
        similarity.append(cosine_similarity(random_vector, v))
        max_values = sorted(similarity)[-11:]
        max_values.reverse()
        max_indices = []
        for val in max_values:
            idx = similarity.index(val)
            max_indices.append(idx)
    f1.write("和"+str(max_words[random_number[i]])+"最接近的十个词是：\n")
    for j in range(1,11):
        f1.write("    "+str(max_words[max_indices[j]])+'\n')
f1.close()

