<a href="https://colab.research.google.com/github/sharmer156/rnn_lyrics/blob/master/test/RNN01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import numpy as np
import tensorflow as tf  #这里用的tf2
import os

In [4]:
read_file = 'j.txt'  #你的文件位置
text = open(read_file, 'rb').read().decode(encoding='utf-8')

#清理文本中的特殊符号
def clean_text(text): 
  cleaned = text.strip().replace(' ','').replace('\u3000','').replace('\ufeff','').replace('(','').replace(')','')
  cleaned = cleaned.replace('\r', '')
  cleaned = cleaned.replace('：', '')
  return cleaned

after_clean = clean_text(text)
vocab = sorted(set(after_clean))  
# 整个文本有33042 characters, 
#不同字符的个数（词典大小）vocab size 2422

# char <-> idx
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)
text_as_int = np.array([char2idx[c] for c in after_clean]) # shape(33042,)

In [5]:
seq_length = 20  # max input length 20
examples_per_epoch = len(after_clean)//seq_length   # 33042//20; 1652个句子, 如果改进的话， 分句再padd会更好
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
sequences = char_dataset.batch(seq_length+1, drop_remainder=True)

In [6]:
def split_input_target(chunk):       # t时刻 挪到t+1
  input_text = chunk[:-1]
  target_text = chunk[1:]
  return input_text, target_text

dataset = sequences.map(split_input_target)

In [7]:
for input_example, target_example in  dataset.take(1):
  print ('Input data: ', repr(''.join(idx2char[input_example.numpy()])))
  print ('Target data:', repr(''.join(idx2char[target_example.numpy()])))

Input data:  '想要有直升机\n想要和你飞到宇宙去\n想要和'
Target data: '要有直升机\n想要和你飞到宇宙去\n想要和你'


In [8]:
BATCH_SIZE = 64       #  1652//64; 每个epoch 训练25次
BUFFER_SIZE = 2000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
vocab_size = len(vocab)    # embedding 参数
embedding_dim = 300
rnn_units = 1024
dataset

<BatchDataset shapes: ((64, 20), (64, 20)), types: (tf.int64, tf.int64)>

In [9]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
    tf.keras.layers.GRU(rnn_units, return_sequences=True, stateful=True, recurrent_initializer='glorot_uniform'),
    tf.keras.layers.Dense(vocab_size)
  ])
  return model

model = build_model(
  vocab_size = len(vocab),
  embedding_dim=embedding_dim,
  rnn_units=rnn_units,
  batch_size=BATCH_SIZE)

model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
embedding (Embedding)        (64, None, 300)           774300    
_________________________________________________________________
gru (GRU)                    (64, None, 1024)          4073472   
_________________________________________________________________
dense (Dense)                (64, None, 2581)          2645525   
Total params: 7,493,297
Trainable params: 7,493,297
Non-trainable params: 0
_________________________________________________________________


In [10]:
def loss(labels, logits):
  return tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)

model.compile(optimizer='adam', loss=loss)

# 取一个训练数据查看一下
for input_example_batch, target_example_batch in dataset.take(1):
  example_batch_predictions = model(input_example_batch)
  print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
example_batch_loss  = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", example_batch_loss.numpy().mean())

(64, 20, 2581) # (batch_size, sequence_length, vocab_size)
Prediction shape:  (64, 20, 2581)  # (batch_size, sequence_length, vocab_size)
scalar_loss:  7.8555055


In [11]:
# 检查点保存至的目录
checkpoint_dir = 'training_checkpoints'
# 检查点的文件名
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [13]:
EPOCHS=50
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [14]:
# load weight to model for predict, reshape batch to 1
weight = tf.train.latest_checkpoint(checkpoint_dir)
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
model.load_weights(weight)
model.build(tf.TensorShape([1, None]))

In [15]:
def generate_text(model, start_string):
  
  # 要生成的字符个数
  num_generate = 19
  
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # 创建存储结果的空列表
  text_generated = []

  temperature = 1
  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)  # delete 1dim 

    predictions = predictions / temperature
    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

    # 把预测字符和前面的隐藏状态一起传递给模型作为下一个输入
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))

In [17]:
def generate_text(model, start_string):
  
  # 要生成的字符个数
  num_generate = 19
  
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # 创建存储结果的空列表
  text_generated = []

  temperature = 1
  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)  # delete 1dim 

    predictions = predictions / temperature
    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

    # 把预测字符和前面的隐藏状态一起传递给模型作为下一个输入
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))

In [18]:
print(generate_text(model, start_string=u'烟雨'))

烟雨之间崩坏了最后放弃抵抗
我仰望着夕阳



In [16]:
print(generate_text(model, start_string=u'灯光'))

灯光机
我告诉你们八卦是会过去的
新闻是一


In [21]:
def generate_text(model, start_string):
  
  # 要生成的字符个数
  num_generate = 100
  
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  # 创建存储结果的空列表
  text_generated = []

  temperature = 1
  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)  # delete 1dim 

    predictions = predictions / temperature
    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

    # 把预测字符和前面的隐藏状态一起传递给模型作为下一个输入
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))

In [22]:
print(generate_text(model, start_string=u'烟雨'))

烟雨纷纷雨袅aYaHei
哦跑到哪儿去啦
面店旁的小水沟留着我的天气冷的让人生闷气
火气我沮丧的悲哀
只為永恒的乐曲存在醒过来
鐘逆时鐘而过滤了你和我沦落面成美
沉在盒子里的世界才能完美
他们猜随便猜不重
