
Tomado de https://github.com/enriqueav/lstm_lyrics

In [1]:
import os
import io
import codecs
import re
import numpy as np
# Parameters: change to experiment different configurations
SEQUENCE_LEN = 5
MIN_WORD_FREQUENCY = 1
STEP = 1

def shuffle_and_split_training_set(sentences_original, next_original, percentage_test=2):
    # shuffle at unison
    print('Shuffling sentences')

    tmp_sentences = []
    tmp_next_word = []
    for i in np.random.permutation(len(sentences_original)):
        tmp_sentences.append(sentences_original[i])
        tmp_next_word.append(next_original[i])

    cut_index = int(len(sentences_original) * (1.-(percentage_test/100.)))
    x_train, x_test = tmp_sentences[:cut_index], tmp_sentences[cut_index:]
    y_train, y_test = tmp_next_word[:cut_index], tmp_next_word[cut_index:]

    print("Size of training set = %d" % len(x_train))
    print("Size of test set = %d" % len(y_test))
    return (x_train, y_train), (x_test, y_test)

def print_vocabulary(words_file_path, words_set):
    words_file = codecs.open(words_file_path, 'w', encoding='utf8')
    for w in words_set:
        if w != "\n":
            words_file.write(w+"\n")
        else:
            words_file.write(w)
    words_file.close()

path = 'D:\\Documentos\\GitHub\\freestyle_generator\\data\\'
corpus = path + "corpus.txt"
examples = "examples.txt"
vocabulary = "vocabulary.txt"

with io.open(corpus, encoding='utf-8') as f:
    text = f.read().lower().replace('\n', ' \n ')
    text = text.replace('\xa0', '')
    text = re.sub('[.,:?¿¡!-]+','',text)
print('Corpus length in characters:', len(text))

text_in_words = [w for w in text.split(' ') if w.strip() != '' or w == '\n']
print('Corpus length in words:', len(text_in_words))

# Calculate word frequency
word_freq = {}
for word in text_in_words:
    word_freq[word] = word_freq.get(word, 0) + 1

ignored_words = set()
for k, v in word_freq.items():
    if word_freq[k] < MIN_WORD_FREQUENCY:
        ignored_words.add(k)

words = set(text_in_words)
print('Unique words before ignoring:', len(words))
print('Ignoring words with frequency <', MIN_WORD_FREQUENCY)
words = sorted(set(words) - ignored_words)
print('Unique words after ignoring:', len(words))
print_vocabulary(vocabulary, words)

word_indices = dict((c, i) for i, c in enumerate(words))
indices_word = dict((i, c) for i, c in enumerate(words))


Corpus length in characters: 2436558
Corpus length in words: 525131
Unique words before ignoring: 29292
Ignoring words with frequency < 1
Unique words after ignoring: 29292


Entrenamiento y generacion de Dataset

In [2]:
corpus = path + "corpus_fs.txt"

with io.open(corpus, encoding='utf-8') as f:
    text = f.read().lower().replace('\n', ' \n ')
    text = text.replace('\xa0', '')
    text = re.sub('[.,:?¿¡!-]+','',text)

text_in_words = [w for w in text.split(' ') if w.strip() != '' or w == '\n']

# cut the text in semi-redundant sequences of SEQUENCE_LEN words
sentences = []
next_words = []
ignored = 0
for i in range(0, len(text_in_words) - SEQUENCE_LEN, STEP):
    # Only add the sequences where no word is in ignored_words
    if len(set(text_in_words[i: i+SEQUENCE_LEN+1]).intersection(ignored_words)) == 0:
        sentences.append(text_in_words[i: i + SEQUENCE_LEN])
        next_words.append(text_in_words[i + SEQUENCE_LEN])
    else:
        ignored = ignored + 1
print('Total sequences:', ignored + len(sentences))
print('Ignored sequences:', ignored)
print('Remaining sequences:', len(sentences))

# x, y, x_test, y_test
(sentences, next_words), (sentences_test, next_words_test) = shuffle_and_split_training_set(
    sentences, next_words
)

Total sequences: 83399
Ignored sequences: 0
Remaining sequences: 83399
Shuffling sentences
Size of training set = 81731
Size of test set = 1668


In [3]:
"""
Example script to train a network to generate text with the style of a given corpus
--By word--
It is recommended to run this script on GPU, as recurrent
networks are quite computationally intensive.
Based on
https://github.com/keras-team/keras/blob/master/examples/lstm_text_generation.py
20 epochs should be enough to get decent results.
Uses data generator to avoid loading all the test set into memory.
Saves the weights and model every epoch.
"""

from __future__ import print_function
from keras.callbacks import LambdaCallback, ModelCheckpoint, EarlyStopping
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, LSTM, Bidirectional
import numpy as np
import sys
import json
import os
import matplotlib.pyplot as plt

BATCH_SIZE = 512


# Data generator for fit and evaluate
def generator(sentence_list, next_word_list, batch_size):
    index = 0
    while True:
        x = np.zeros((batch_size, SEQUENCE_LEN, len(words)), dtype=np.bool)
        y = np.zeros((batch_size, len(words)), dtype=np.bool)
        for i in range(batch_size):
            for t, w in enumerate(sentence_list[index % len(sentence_list)]):
                x[i, t, word_indices[w]] = 1
            y[i, word_indices[next_word_list[index % len(sentence_list)]]] = 1
            index = index + 1
        yield x, y



def get_model(dropout=0.2):
    print('Build model...')
    model = Sequential()
    model.add(Bidirectional(LSTM(256), input_shape=(SEQUENCE_LEN, len(words))))
    if dropout > 0:
        model.add(Dropout(dropout))
    model.add(Dense(len(words)))
    model.add(Activation('softmax'))
    return model

def get_custom_model(dropout=0.2):
    print('Build custom model...')
    inputs = Keras.Input(shape=(SEQUENCE_LENGTH, len(words)))
    bidirectional = Bidirectional(LSTM(128))
    x = bidirectional(inputs)
    if dropout > 0:
      x = Dropout(drouput)(x)
    x = Dense(len(words))(x)
    outputs = Activation('softmax')(x)
    model = keras.Model(inputs=inputs, outputs=outputs, name="custom_model")
    return model



# Functions from keras-team/keras/blob/master/examples/lstm_text_generation.py
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)


def on_epoch_end(epoch, logs):
    # Function invoked at end of each epoch. Prints generated text.
    examples_file.write('\n----- Generating text after Epoch: %d\n' % epoch)

    # Randomly pick a seed sequence
    seed_index = np.random.randint(len(sentences+sentences_test))
    seed = (sentences+sentences_test)[seed_index]

    for diversity in [0.3, 0.4, 0.5, 0.6, 0.7]:
        sentence = seed
        examples_file.write('----- Diversity:' + str(diversity) + '\n')
        examples_file.write('----- Generating with seed:\n"' + ' '.join(sentence) + '"\n')
        examples_file.write(' '.join(sentence))

        for i in range(50):
            x_pred = np.zeros((1, SEQUENCE_LEN, len(words)))
            for t, word in enumerate(sentence):
                x_pred[0, t, word_indices[word]] = 1.

            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_word = indices_word[next_index]

            sentence = sentence[1:]
            sentence.append(next_word)

            examples_file.write(" "+next_word)
        examples_file.write('\n')
    examples_file.write('='*80 + '\n')
    examples_file.flush()


model = get_model()
if 'weights.h5' in os.listdir():
  print('Weights loaded')
  model.load_weights('weights.h5')
model.compile(loss='categorical_crossentropy', optimizer="adam", metrics=['accuracy'])

file_path = "./checkpoints/LSTM_LYRICS-epoch{epoch:03d}-words%d-sequence%d-minfreq%d-" \
            "loss{loss:.4f}-acc{accuracy:.4f}-val_loss{val_loss:.4f}-val_acc{val_accuracy:.4f}" % \
            (len(words), SEQUENCE_LEN, MIN_WORD_FREQUENCY)

checkpoint = ModelCheckpoint(file_path, monitor='val_acc', save_best_only=True)
print_callback = LambdaCallback(on_epoch_end=on_epoch_end)
early_stopping = EarlyStopping(monitor='val_acc', patience=5)
callbacks_list = [checkpoint, print_callback, early_stopping]

examples_file = open(examples, "w")
history = model.fit(generator(sentences, next_words, BATCH_SIZE),
                    steps_per_epoch=int(len(sentences)/BATCH_SIZE) + 1,
                    epochs=10,
                    #callbacks=callbacks_list,
                    validation_data=generator(sentences_test, next_words_test, BATCH_SIZE),
                    validation_steps=int(len(sentences_test)/BATCH_SIZE) + 1)

with io.open('history.json', 'w') as history_file:
    json.dump(history.history, history_file)

model.save_weights("weights.h5")
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


Build model...
Weights loaded
Epoch 1/10
  1/160 [..............................] - ETA: 20:28 - loss: 0.7117 - accuracy: 0.8711

InternalError:    Failed to call ThenRnnBackward with model config: [rnn_mode, rnn_input_mode, rnn_direction_mode]: 2, 0, 0 , [num_layers, input_size, num_units, dir_count, max_seq_length, batch_size, cell_num_units]: [1, 29292, 256, 1, 5, 512, 256] 
	 [[{{node gradients/CudnnRNN_grad/CudnnRNNBackprop}}]]
	 [[Adam/gradients/PartitionedCall_1]] [Op:__inference_train_function_5476]

Function call stack:
train_function -> train_function -> train_function


In [30]:
model.save_weights('weights.h5')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Generacion

In [6]:
"""
Script to generate text from an already trained network (with lstm_train.py)
--By word--
It is necessary to at least provide the trained model and the vocabulary file
(generated also by lstm_train.py).
"""


import argparse
import numpy as np
import re
from keras.models import load_model

def validate_seed(vocabulary, seed):
    """Validate that all the words in the seed are part of the vocabulary"""
    print("\nValidating that all the words in the seed are part of the vocabulary: ")
    seed_words = seed.split(" ")
    valid = True
    for w in seed_words:
        print(w, end="")
        if w in vocabulary:
            print(" ✓ in vocabulary")
        else:
            print(" ✗ NOT in vocabulary")
            valid = False
    return valid


# Functions from keras-team/keras/blob/master/examples/lstm_text_generation.py
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

def rhyme_score(preds, previous_syllables):
  #Previous syllables deberia ser la ultima silaba, o las dos ultimas silabas concatenadas
    def rima(old_syl, next_syl):        
        vowels = ['a','e','i','o','u','á', 'é', 'í', 'ó', 'ú']
        if old_syl==next_syl:
            return 0.8
        old_strip="".join(filter(lambda c: c in vowels, old_syl))
        next_strip="".join(filter(lambda c: c in vowels, next_syl))
        return 0.7 if old_strip==next_strip else 0    
    for index in range(len(preds)):
        #if(preds[index]) > 0.8: #Modificar a verdaderas candidatas a sílaba
        preds[index] += (1.0 - preds[index])*rima(previous_syllables,indices_word[index]) 
    return preds

def generate_text(model, indices_word, word_indices, seed,
                  sequence_length, diversity, quantity):
    """
    Similar to lstm_train::on_epoch_end
    Used to generate text using a trained model

    :param model: the trained Keras model (with model.load)
    :param indices_word: a dictionary pointing to the words
    :param seed: a string to be used as seed (already validated and padded)
    :param sequence_length: how many words are given to the model to generate
    :param diversity: is the "temperature" of the sample function (usually between 0.1 and 2)
    :param quantity: quantity of words to generate
    :return: Nothing, for now only writes the text to console
    """
    sentence = seed.split(" ")
    print("----- Generating text")
    print('----- Diversity:' + str(diversity))
    print('----- Generating with seed:\n"' + seed)

    print()
    print(seed)

    prev_preds = []
    text = []
    last_syllables = []
    elegidas = []
    elegidas_text = []
    rhyme_time=0
    for i in range(quantity):
        x_pred = np.zeros((1, sequence_length, len(vocabulary)))
        for t, word in enumerate(sentence):
            x_pred[0, t, word_indices[word]] = 1.

        preds = model.predict(x_pred, verbose=0)[0]
        # preds tiene arreglo resultado de softmax

        next_index = sample(preds, diversity)
        next_word = indices_word[next_index]

        if next_word == '\n' and len(text) > 0:
          # redefinir sentence[-1] usando prev_preds (valor de la prediccion anterior)
          
          text[-1] = text[-1].upper()
          
          if (text[-1] != '\n'):
            silaba_reemplazada = text[-1]
            if last_syllables != [] and (text[-1]!='\\' or text[-1]!='\n'):# and (rhyme_time%2==0):              
              changed_prev_index = sample(rhyme_score(prev_preds, last_syllables[-1][-1]))
              new_word = indices_word[changed_prev_index]
              text[-1] = new_word
              sentence[-1] = new_word
              elegidas_text.append((new_word,silaba_reemplazada))
              #rhyme_score+=1

            last_syllables.append(list(filter(lambda x: (x!='\\'),sentence[-2:])))
          else:
            last_syllables = []

        sentence = sentence[1:]
        sentence.append(next_word)
        text.append(next_word)
        prev_word = next_word
        prev_preds = preds

    print("Pares de silabas reemplazadas, silabas originales:")
    print(elegidas_text)
    print()
    return text

vocabulary_file = "vocabulary.txt"
#model_file = args.network
seed = "el barrio cuenta que dices"
sequence_length = SEQUENCE_LEN #args.sequence_length
diversity = 0.5#args.diversity
quantity = 5000 #args.quantity

#if not vocabulary_file or not model_file:
#    print('\033[91mERROR: At least --vocabulary and --network are needed\033[0m')
#    exit(0)

#model = load_model(model_file)
print("\nSummary of the Network: ")
model.summary()

vocabulary = open(vocabulary_file, "r", encoding='utf-8').readlines()
# remove the \n at the end of the word, except for the \n word itself
vocabulary = [re.sub(r'(\S+)\s+', r'\1', w) for w in vocabulary]
vocabulary = sorted(set(vocabulary))

word_indices = dict((c, i) for i, c in enumerate(vocabulary))
indices_word = dict((i, c) for i, c in enumerate(vocabulary))

if validate_seed(vocabulary, seed):
    print("\nSeed is correct.\n")
    # repeat the seed in case is not long enough, and take only the last elements
    seed = " ".join((((seed+" ")*sequence_length)+seed).split(" ")[-sequence_length:])
    text = generate_text(
        model, indices_word, word_indices, seed, sequence_length, diversity, quantity
    )
    with open('generated2.txt', "w") as text_file:
      for word in text: 
        printword = word+" " if word != '\\' else ' '
        #print(printword)
        text_file.write(printword)
    print(printword)
else:
    print('\033[91mERROR: Please fix the seed string\033[0m')