数据集:'./datasets/jaychou_lyrics.txt.zip'

In [1]:
# 下载数据集
import zipfile

def load_dataset(file):
    with zipfile.ZipFile(file) as zin:
        with zin.open(zin.namelist()[0]) as pf:
            dataset = pf.read().decode('utf-8')
            # 将数a据集中的换行符换成空格
            dataset = dataset.replace('\r',' ').replace('\n',' ')
    return dataset

In [2]:
file = './datasets/jaychou_lyrics.txt.zip'
dataset = load_dataset(file)
print('字符集长度:',len(dataset))
print(dataset[:100])

字符集长度: 63282
想要有直升机 想要和你飞到宇宙去 想要和你融化在一起 融化在宇宙里 我每天每天每天在想想想想著你 这样的甜蜜 让我开始乡相信命运 感谢地心引力 让我碰到你 漂亮的让我面红的可爱女人 温柔的让我心疼的可


In [3]:
# 建立字符索引
index_to_word = list(set(dataset))
word_to_index = {word:index for index, word in enumerate(index_to_word)}
vocab_size = len(index_to_word) 

In [4]:
def get_corpus_indices(words,word_to_index):
    corpus_indices = [word_to_index[word] for word in words]
    return corpus_indices

In [5]:
from mxnet import nd

# 将索引转换成one-hot编码
def to_onehot(X,size):
    return [nd.one_hot(x,size) for x in X.T]

In [32]:
print(to_onehot(nd.array([[2,5],[1,3]]),10))

[
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]]
<NDArray 2x10 @cpu(0)>, 
[[0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]]
<NDArray 2x10 @cpu(0)>]


In [6]:
# 初始化模型参数
# 只含一个隐藏层
import mxnet as mn 
num_input,num_hidden,num_output = vocab_size,256,vocab_size
ctx = mn.gpu()

def get_params():
    def _one(shape):
        return nd.ones(scale=0.01,shape=shape,ctx=ctx)
    
    # 掩藏层参数
    w_xh = _one((num_input,num_hidden))
    w_hh = _one((num_hidden,num_hidden))
    b_h = nd.zeros(num_hidden,ctx=ctx)
    
    # 输出层参数
    w_ho = _one((num_hidden,num_output))
    b_o = nd.ones(num_output,ctx=ctx)
    
    params = [w_xh,w_hh,b_h,w_ho,b_o]
    for param in parms:
        param.attch_grad()
    return params

In [7]:
# 定义模型
def rnn(inputs,state,params):
    w_xh,w_hh,b_h,w_ho,b_o = params
    H = state
    outputs = []
    for x in inputs:
        H = nd.relu(nd.dot(x,w_xh) + nd.dot(H,w_hh) + b_h)
        output = nd.dot(hidden_output, w_ho) + b_o
        outputs.append(output)
    return outputs, (H,)

In [8]:
# 定义预测函数
def predict_rnn(prefix,num_chars,rnn,params,init_rnn_state,
               num_hidden,vocab_size,ctx,idx_to_char,char_to_idx):
    state = init_rnn_state(1,num_hidden,ctx)
    output = [char_to_idx[prefix[0]]]
    
    for t in range(num_chars + len(prefix)-1):
        # 将上一时间步的输出作为当前时间步的输入
        X = to_onehot(nd.array([output[-1]], ctx=ctx),vocab_size)
        # 计算输出和隐藏状态
        (Y,state) = rnn(X,state,params)
        # 下一个时间步的输入是prefix里的字符或者当前的最佳预测字符
        if t < len(prefix) - 1:
            output.append(char_to_idx[prefix[t+1]])
        else:
            output.append(int(Y[0].argmax(axis=1).asscalar()))

In [18]:
# 随机采样
import random
def data_iter_random(corpus_indices,batch_size,num_steps,ctx=None):
    num_examples = (len(corpus_indices) - 1) // num_steps
    epoch_size = num_examples // batch_size
    
    example_indices = list(range(num_examples))
    random.shuffle(example_indices)
 
    # 返回从pos开始的长为num_steps的序列
    def _data(pos):
        return corpus_indices[pos:pos + num_steps]
    
    for i in range(epoch_size):
        batch_indices = example_indices[i* batch_size:(i+1)* batch_size]
        X = [_data(j * num_steps) for j in batch_indices]
        Y = [_data(j * num_steps + 1) for j in batch_indices]
        yield nd.array(X,ctx),nd.array(Y,ctx)

In [13]:
# 相邻采样
def data_iter_consecutive(corpus_indices,batch_size,num_steps,ctx=None):
    nums_in_batch = (len(corpus_indices) - 1) // batch_size
    corpus_indices_batch_matrix = nd.array(corpus_indices,ctx=ctx).reshape(batch_size,nums_in_batch) 
    epoch_size = nums_in_batch // num_steps
    for epoch in range(epoch_size):
        X = corpus_indices_batch_matrix[:,epoch*num_steps:(epoch+1)*num_steps]
        Y = corpus_indices_batch_matrix[:,epoch*num_steps + 1:(epoch+1)*num_steps + 1]
        yield X,Y

In [19]:
# 测试相邻采样
seq = list(range(30))
print('dataset_iter_random test:')
dataset_iter_random = data_iter_random(seq,2,6,ctx=mn.cpu())
for x,y in dataset_iter_random:
    print('X:',x)
    print('Y:',y)
    
print('dataset_iter_consecutive test:')
dataset_iter_consecutive = data_iter_consecutive(seq,2,5,ctx=mn.cpu())
for x,y in dataset_iter_consecutive:
    print('X:',x)
    print('Y:',y)

dataset_iter_random test:
X: 
[[18. 19. 20. 21. 22. 23.]
 [12. 13. 14. 15. 16. 17.]]
<NDArray 2x6 @cpu(0)>
Y: 
[[19. 20. 21. 22. 23. 24.]
 [13. 14. 15. 16. 17. 18.]]
<NDArray 2x6 @cpu(0)>
X: 
[[ 6.  7.  8.  9. 10. 11.]
 [ 0.  1.  2.  3.  4.  5.]]
<NDArray 2x6 @cpu(0)>
Y: 
[[ 7.  8.  9. 10. 11. 12.]
 [ 1.  2.  3.  4.  5.  6.]]
<NDArray 2x6 @cpu(0)>
dataset_iter_consecutive test:
X: 
[[ 0.  1.  2.  3.  4.]
 [14. 15. 16. 17. 18.]]
<NDArray 2x5 @cpu(0)>
Y: 
[[ 1.  2.  3.  4.  5.]
 [15. 16. 17. 18. 19.]]
<NDArray 2x5 @cpu(0)>
X: 
[[ 5.  6.  7.  8.  9.]
 [19. 20. 21. 22. 23.]]
<NDArray 2x5 @cpu(0)>
Y: 
[[ 6.  7.  8.  9. 10.]
 [20. 21. 22. 23. 24.]]
<NDArray 2x5 @cpu(0)>


In [None]:
from  mxnet.gluon import loss as gloss
from mxnet import autograd
# 模型训练
def train_and_predict_rnn(rnn,get_params,init_rnn_state,num_hidden,vocab_size,
                         ctx,corplus_indices,idx_to_char,char_to_idx,is_random_iter,
                         num_epochs,num_steps,,batch_size,learning_rate,clipping_theta,
                         pred_period,pred_len,prefixes):
    # 数据采样
    if is_random_iter:
        data_iter_fn = data_iter_random
    else:
        data_iter_fn = data_iter_consecutive
    # 获取模型参数
    params = get_params()
    # 损失函数
    loss= gloss.SoftmaxCrossEntropyLoss()
    
    for epoch in range(num_epochs):
        if not is_random_iter: # 若使用相邻采样，在epoch开始时初始化隐藏状态
            state = init_rnn_state(batch_size,num_hidden,ctx)  #上一次的输出
        
        data_iter = data_iter_fn(corplus_indices,batch_size,num_steps,ctx)
        for X,Y in data_iter:
            if is_random_iter: # 如果是随机采样，在每个小批量更新前初始化隐藏状态
                state = init_rnn_state(batch_size,num_hidden,ctx)  #上一次的输出
            else: # 否则需要使用detach从计算图分离隐藏状态
                for s in state:
                    s.detach()
            with autograd.record():
                inputs = to_onehot(X,vocab_size)
                # outputs有num_steps个形状为(batch_size,vocab_size)的矩阵
                (outputs,state) = rnn(inputs,state,params)
                # 连接之后形状为(num_steps * batch_size, vocab_size)
                outputs = nd.concat(*outputs,dim=0)
                # y的形状是(batch_size,num_steps),转置后变成长度为batch_size * num_step的向量，
                # 这样跟输出的行一一对应
                y = Y.T.reshape((-1,))
                l = loss(putputs,y).mean()
            l.backward()
            