In [1]:
# 导入相关库并设置所需全局变量
import re
from collections import Counter
import torch
from torch import nn
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
 
# 定义并封装文本或字符串的预处理方法
class YuChuLi:
    # 文本预处理
    @staticmethod
    def textYuChuLi(text):
        with open(text) as text:
            text = text.read()
 
            # 正则表达式匹配所有非字母和非空格的字符转为空格
            text = re.sub(r'[^a-zA-Z\s]', ' ', text)
            # 替换连续空格为单个空格
            text = re.sub(r'\s+', ' ', text)
 
            text = text.lower()
        return text
 
    # 字符串预处理
    @staticmethod
    def stringYuChuLi(string):
        string = re.sub(r'[^a-zA-Z\s]', ' ', string)
        string = re.sub(r'\s+', ' ', string)
        string = string.lower()
        return string
    
    # 统计字符串中空格和各种单词数量
    @staticmethod
    def conunt(string):
        # 正则表达式匹配所有空格并统计
        spacesCount = len(re.findall(r'\s', string))
 
        # 统计各种单词数量
        string = string.split()
        wordCount = Counter(string)
        wordCount = wordCount.most_common()
        print(f'空格：{spacesCount}个')
        for word, count in wordCount:
            print(f'{word}：{count}个')
        
    # 构建双向字典词表
    @staticmethod
    def vocabulary(string):
        charToIndexVocabulary = dict()
        indexToCharVocabulary = dict()
        index = 0
        for char in string:
            if char not in charToIndexVocabulary.keys():
                charToIndexVocabulary[char] = index
                indexToCharVocabulary[index] = char
                index += 1
        return charToIndexVocabulary, indexToCharVocabulary
    
    # 字符序列串转数字序列
    @staticmethod
    def charToIndexTransform(string, charToIndex):
        indexSequence = []
        for char in string:
            if char in charToIndex:
                indexSequence.append(charToIndex[char])
        return indexSequence
 
    # 数字序列转字符序列
    @staticmethod
    def indexToCharTransform(index, indexToChar):
        charSequence = []
        for index in index:
            if index in indexToChar:
                charSequence.append(indexToChar[index])
        return charSequence
 
    # 截取序列获得样本
    @staticmethod
    def example(sequence, window):
        feature, label= [], []
        for i in range(len(sequence) - window  - 1):
            feature.append(sequence[i : i + window])
            label.append(sequence[i + window + 1])
        return feature, label
 
#文本预处理
text = YuChuLi.textYuChuLi('C:\\Users\\kongbai\\study\\dataset\\book.txt')
count = YuChuLi.conunt(text)
charToIndexVocabulary, indexToCharVocabulary = YuChuLi.vocabulary(text)
for key, value in charToIndexVocabulary.items():
    print(f"{key}: {value}")
for key, value in indexToCharVocabulary.items():
    print(f"{key}: {value}")
textIndexSequence = YuChuLi.charToIndexTransform(text, charToIndexVocabulary)
window = 5
feature, label = YuChuLi.example(textIndexSequence, window)
 
# 文本预处理结果向量化并创建数据迭代器
featureTensor = torch.tensor(feature, device=device)
labelTensor = torch.tensor(label, device=device)
tensorDataset = torch.utils.data.TensorDataset(featureTensor, labelTensor)
batch_size = 64
shuffle = True
dataloader = torch.utils.data.DataLoader(tensorDataset, batch_size, shuffle)
 
# 定义模型
class Net(nn.Module):
    def __init__(self, input_size, hidden_size, dropout, output_size):
        super().__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.bilstm = nn.LSTM(hidden_size, hidden_size, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size * 2, hidden_size // 2)
        self.fc2 = nn.Linear(hidden_size // 2, hidden_size // 4)
        self.fc3 = nn.Linear(hidden_size // 4, output_size)
 
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.bilstm(x)
        x = x[:, -1, :]
        x = self.dropout(x)
        x = self.fc1(x)
        x = nn.functional.elu(x)
        x = self.fc2(x)
        x = nn.functional.elu(x)
        x = self.fc3(x)
        return x
 
# 设置初始化模型所需的参数并初始化模型
input_size = output_size = len(charToIndexVocabulary)
hidden_size = 32
dropout = 0.4
network = Net(input_size, hidden_size, dropout, output_size).to(device)
 
# 定义损失函数和优化器并初始化所需参数
Loss = nn.CrossEntropyLoss()
lr = 0.005
optimizer = torch.optim.Adam(network.parameters(), lr)
 
# 训练模型
network.train()
num_epochs = 2
for epoch in range(num_epochs):
    i = 1
    for features, label in dataloader:
        output = network(features)
        loss = Loss(output, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        i += 1
        if i % 50 == 0:
            print(f'第{epoch + 1}轮，第{i}个损失：{loss}')
    print(f'第{epoch + 1}轮最后一次损失：{loss}')
 
# 模型预测
network.eval()
window = 5
num_steps = 100
string = 'LiLing_KongBai'
string = YuChuLi.stringYuChuLi(string)
stringIndexSequence = YuChuLi.charToIndexTransform(string, charToIndexVocabulary)
with torch.no_grad():
    for i in range(num_steps):
        example = torch.tensor(stringIndexSequence[-window :]).unsqueeze(0).to(device)
        new_output = network(example)
        predict_index = new_output.argmax(dim=1, keepdim=True)
        stringIndexSequence.append(predict_index.item())
predict_char = YuChuLi.indexToCharTransform(stringIndexSequence, indexToCharVocabulary)
predict_char = ''.join(predict_char)
print(predict_char)
 
# 手动单步预测
string2 = 'LiLing_KongBai'
string2 = YuChuLi.stringYuChuLi(string2)
stringIndexSequence2 = YuChuLi.charToIndexTransform(string2, charToIndexVocabulary)
example2 = torch.tensor(stringIndexSequence2[-window :]).unsqueeze(0).to(device)
output2 = network(example2)
predict_index2 = output2.argmax(dim=1, keepdim=True)
stringIndexSequence2.append(predict_index2.item())
predict_char2 = YuChuLi.indexToCharTransform(stringIndexSequence2, indexToCharVocabulary)
predict_char2 = ''.join(predict_char2)
print(predict_char2)

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\kongbai\\study\\dataset\\book.txt'