In [54]:
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
import torch.nn.functional as F

import jieba
from tqdm import tqdm

import re
import time 

In [8]:
# 1. 获取词表
def build_vocab():
    file_name='raw_data/jaychou_lyrics.txt'

    # TODO 1.清洗文本
    # 初始化一个列表，用于存储清洗后的句子
    clean_sentences = []
    for line in open(file_name,'r',encoding='utf-8'):
        line = line.replace('〖韩语Rap译文〗', '')
        # 去除 除了 中文、英文、数字、部分标点符号外 的其他字符
        line = re.sub(r'[^\u4e00-\u9fa5 a-zA-Z0-9!?,]', '', line)
        # 连续空格替换成1个
        line = re.sub(r'[ ]{2,}', '', line)
        line = line.strip()  # 去除行首和行尾的空白字符和换行符
        if len(line) <= 1:  # 如果行的长度小于等于1，则跳过该行
            continue
        if line not in clean_sentences:  # 如果行不在clean_sentences中，则添加
            clean_sentences.append(line)  # 将清洗后的行添加到clean_sentences列表中
    
    # TODO 2. 语料分词
    # TODO 初始化两个列表: index_to_word用于存储词汇表, all_sentences用于存储所有分词后的句子
    index_to_word, all_sentences = [], []
    for line in clean_sentences:  # 遍历清洗后的句子
        words = jieba.lcut(line)  # 使用jieba对句子进行分词，返回词汇列表
        # print("words: ", words)
        all_sentences.append(words)  # 将分词结果添加到all_sentences列表中
        # print("all_sentences: ", all_sentences)
        for word in words:  # 遍历每个词
            if word not in index_to_word:  # 如果词不在词汇表中
                # print("word: ", word)
                index_to_word.append(word)  # 将词添加到词汇表中
                # print("index_to_word: ", index_to_word)
    # 词到索引映射
    word_to_index = {word: idx for idx, word in enumerate(index_to_word)}  # 创建词到索引的映射字典
    # print("word_to_index_start: ", word_to_index)
    # 词的数量
    word_count = len(index_to_word)  # 计算词汇表中词的数量
    # 句子索引表示
    corpus_idx = []  # 初始化一个列表，用于存储整个语料的索引表示
    for sentence in all_sentences:  # 遍历每个分词后的句子
        temp = []  # 初始化一个临时列表，用于存储句子的索引
        for word in sentence:  # 遍历句子中的每个词
            temp.append(word_to_index[word])  # 将词转换为索引并添加到临时列表中
        # 在每行歌词之间添加空格隔开
        temp.append(word_to_index[' '])  # 在每个句子末尾添加空格的索引作为分隔符
        # print("temp: ", temp)
        # TODO extend()是逐个添加, 区别于extend()
        corpus_idx.extend(temp)  # 将句子的索引表示添加到corpus_idx列表中
        # print("corpus_idx: ", corpus_idx)
    # TODO 返回构建的词汇表、索引映射、词数、语料索引
    return index_to_word, word_to_index, word_count, corpus_idx
index_to_word, word_to_index, word_count, corpus_idx = build_vocab()

[0, 1, 2, 39, 0, 3, 4, 5, 6, 7, 39, 0, 3, 4, 8, 9, 10, 39, 8, 9, 6, 11, 39, 12, 13, 13, 13, 9, 14, 14, 15, 4, 39, 16, 17, 18, 39, 19, 12, 20, 21, 22, 23, 39, 24, 25, 39, 19, 12, 26, 4, 39, 27, 17, 19, 12, 28, 17, 29, 30, 39, 31, 17, 19, 12, 32, 17, 29, 30, 39, 33, 17, 19, 12, 34, 17, 29, 30, 39, 35, 17, 19, 12, 36, 17, 29, 30, 39, 37, 38, 39, 40, 41, 37, 42, 39, 43, 44, 39, 45, 46, 47, 39, 48, 40, 39, 12, 0, 49, 12, 17, 50, 51, 39, 52, 9, 53, 39, 54, 55, 56, 39, 52, 49, 57, 39, 58, 59, 39, 19, 12, 60, 39, 61, 62, 39, 63, 4, 64, 65, 17, 66, 67, 39, 68, 69, 70, 39, 71, 72, 73, 39, 4, 74, 12, 17, 75, 39, 19, 76, 17, 77, 39, 78, 79, 80, 39, 81, 17, 82, 39, 4, 17, 83, 39, 84, 85, 39, 19, 12, 86, 87, 88, 89, 90, 39, 49, 91, 92, 93, 94, 39, 95, 66, 96, 39, 97, 98, 99, 100, 101, 39, 12, 102, 103, 71, 104, 39, 105, 106, 107, 39, 108, 39, 108, 39, 109, 110, 111, 112, 113, 114, 39, 115, 116, 39, 117, 118, 119, 120, 39, 121, 109, 110, 111, 112, 113, 114, 39, 115, 116, 39, 117, 118, 119, 120, 39, 1

In [7]:
# 处理歌词数据集
class LyricsDataset:
    def __init__(self, corpus_idx, num_chars):  # 初始化方法，构造函数
        # 语料数据
        self.corpus_idx = corpus_idx  # 将传入的语料索引列表存储为类的属性
        # 语料长度
        self.num_chars = num_chars  # 将每个输入序列的长度（字符数）存储为类的属性
        # 词的数量
        self.word_count = len(self.corpus_idx)  # 计算语料索引列表的长度，即词汇总数
        # 句子数量
        #self.number = self.word_count // self.num_chars  # 计算可以提取的样本序列的数量，每个样本长度为 num_chars
        self.number = max(0, self.word_count - self.num_chars)

    def __len__(self):  # 定义返回数据集大小的方法
        return self.number  # 返回可以提取的样本序列数量

    # TODO 后续通过DataLoader()加载这里的数据
    def __getitem__(self, idx):  # 定义获取数据集中某个样本的方法
        # 修正索引值到: [0, self.word_count - 1]
        start = min(max(idx, 0), self.word_count - self.num_chars - 1)  # 限制起始索引，确保有效的样本提取范围
        x = self.corpus_idx[start: start + self.num_chars]  # 获取从起始索引开始的 num_chars 长度的序列作为输入
        y = self.corpus_idx[start + 1: start + 1 + self.num_chars]  # 获取从起始索引+1开始的 num_chars 长度的序列作为目标
        return torch.tensor(x), torch.tensor(y)  # 返回输入和目标序列作为张量

In [101]:
class TextGenerator(nn.Module):
    def __init__(self,vocab_size, batch_size):
        print("vocab_size: ", vocab_size)
        super(TextGenerator, self).__init__()
        self.embedding=nn.Embedding(vocab_size, 128)
        self.rnn=nn.LSTM(128, 128, 1)
        self.out=nn.Linear(128, vocab_size)
        self.dropout=nn.Dropout(0.2)
        self.batch_size=batch_size
        
    def forward(self, inputs, hidden):
        embed=self.embedding(inputs)
        # embed=self.dropout(embed)
        # print("embed: ", embed.shape)
        output,hidden=self.rnn(embed.transpose(0,1), hidden)
        output=self.dropout(output)
        output=self.out(output.transpose(0,1))
        return output,hidden
    
    def init_hidden(self):
        h0 = torch.ones(1, self.batch_size, 128).to('cuda')
        c0 = torch.ones(1, self.batch_size, 128).to('cuda')
        return (h0,c0)

In [106]:
def train(epoch,train_log):
    device='cuda'
    index_to_word, word_to_index, word_count, corpus_idx = build_vocab()
    dataset=LyricsDataset(corpus_idx, 20)
    model=TextGenerator(len(index_to_word),10).to( device)
    criterion=nn.CrossEntropyLoss()
    optimizer=optim.AdamW(model.parameters(), lr=0.01)
    file=open(train_log,'w')
    for epoch_idx in range(epoch):
        lyrics_generator=DataLoader(dataset, batch_size=10, shuffle=True)
        start=time.time()
        iter_num=0
        total_loss=0.0
        for x,y in tqdm(lyrics_generator):
            hidden=model.init_hidden()
            x,y=x.to(device),y.to(device)
            # print(x.shape, y.shape)
            y_pred,hidden=model(x, hidden)
            # print("y_pred: ", y_pred.shape)
            loss=criterion(y_pred.view(-1, y_pred.shape[-1]), y.view(-1,))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss+=loss.item()
            iter_num+=1
        message='epoch %3s loss: %.5f time %.2f' % \
                  (epoch_idx + 1,  # 当前训练轮数
                   total_loss / iter_num,  # 平均损失
                   time.time() - start)  # 本轮训练时间
        print(message)
        file.write(message + '\n')
    file.close()
    torch.save(model.state_dict(), 'model/lyrics_model.pth')
train(5, 'train_log.txt') 

  0%|          | 0/2365 [00:00<?, ?it/s]

vocab_size:  5744


100%|██████████| 2365/2365 [00:22<00:00, 106.78it/s]
  1%|          | 13/2365 [00:00<00:18, 126.35it/s]

epoch   1 loss: 2.69632 time 22.17


100%|██████████| 2365/2365 [00:22<00:00, 106.75it/s]
  0%|          | 10/2365 [00:00<00:26, 89.20it/s]

epoch   2 loss: 0.95911 time 22.20


100%|██████████| 2365/2365 [00:28<00:00, 81.86it/s]
  0%|          | 10/2365 [00:00<00:24, 97.33it/s]

epoch   3 loss: 0.77868 time 28.95


100%|██████████| 2365/2365 [00:29<00:00, 81.20it/s] 
  0%|          | 11/2365 [00:00<00:22, 104.48it/s]

epoch   4 loss: 0.71140 time 29.16


100%|██████████| 2365/2365 [00:23<00:00, 100.75it/s]

epoch   5 loss: 0.66880 time 23.54





In [115]:
def predict(start_word, sentence_length, model_path):
    # torch.manual_seed(42)
    # torch.cuda.manual_seed(42)
    with torch.no_grad():
        device = 'cuda'
        # 构建词典，返回索引到词，词到索引，词汇数量
        index_to_word, word_to_index, word_count, _ = build_vocab()
        # 构建文本生成模型实例，词汇表大小为 word_count
        model = TextGenerator(vocab_size=word_count,batch_size=1).to(device)
        # 加载训练好的模型参数
        model.load_state_dict(torch.load(model_path))
        # model.eval()
        # 初始化隐藏状态
        hidden = model.init_hidden()
        try:
            # 将起始词转换为词索引
            word_idx = word_to_index[start_word]
        except:
            print("该词不在词典中, 请重新输入")
            return
        # 用于存储生成的句子（词索引序列）
        generate_sentence = [word_idx]
        # 生成长度为 sentence_length 的句子
        for _ in range(sentence_length):
            # 前向传播，获取模型输出和隐藏状态
            output, hidden = model(torch.tensor([[word_idx]]).to(device), hidden)
            # print("output: ", output)
    
            # 获取输出中概率最大的词的索引，更新wordidx
            word_idx = torch.argmax(output).item()
            # 将该词索引添加到生成的句子中
            generate_sentence.append(word_idx)
        # 将生成的词索引序列转换为实际词并打印
        for idx in generate_sentence:
            print(index_to_word[idx], end='')
        print()
predict('分手', 100, 'model/lyrics_model.pth')

vocab_size:  5744
分手的眼泪 看我的重要性 彩色的大卷發紅鼻子最滑稽的步伐 妈妈桑 茶道 有超多导演跟编剧 只说了台词一句 而他们配了八百个语气 操控着我的情绪 那就咆哮吧 让每个人睡可以 运气从来就不在我这里 有没有口罩一个给我 释怀说了太多要离开就会上瘾 扯你就会上瘾 扯你已经不是个傻瓜 说谎就要付出代价
