<a href="https://colab.research.google.com/github/orlandxrf/curso-dl/blob/main/notebooks/8e_LSTM_TextGeneration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Genereación de Texto usando una LSTM
Para esta tarea se utilizó el libro [Memorias de un vigilante by José S. Alvarez](https://www.gutenberg.org/ebooks/19543).

[Libros en idioma español del proyecto Gutenberg](https://www.gutenberg.org/browse/languages/es)

In [None]:
# Definir las bibliotecas a usar
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F

In [None]:
# cargar el libro
import os

URL = 'https://raw.githubusercontent.com/orlandxrf/curso-dl/main/data/memoriasDeUnVigilante.txt'
data_folder = 'data'
filepath = os.path.join(data_folder, 'memoriasDeUnVigilante.txt')
print (f"filepath:\t{filepath}")

# crear carpeta para almacenar el conjunto de datos
! mkdir {data_folder}

# descargar conjunto de datos y alamcenar
! wget -nc {URL} -O {filepath}

# comprobar descarga
! ls -lh data/*

filepath:	data/memoriasDeUnVigilante.txt
mkdir: cannot create directory ‘data’: File exists
File ‘data/memoriasDeUnVigilante.txt’ already there; not retrieving.
-rw-r--r-- 1 root root 128K Mar 15 01:50 data/memoriasDeUnVigilante.txt


In [None]:
# leer el libro
text = ""
with open('data/memoriasDeUnVigilante.txt', 'r') as f:
    for line in f:
        if len(line.strip()) > 0: # evitar leer líneas en blanco
            text += line
f.close()

In [None]:
print (text[0:1000])
print (f"\nLongitud del texto:\t{len(text):,}")

I
DOS PALABRAS
No abrigo la esperanza de que mis recuerdos lleguen a constituir un
libro interesante; los he escrito en mis ratos de ocio y no tengo
pretensiones de filósofo, ni de literato.
No obstante, creo que nadie que me lea perderá su tiempo, pues, por lo
menos, se distraerá con casos y cosas que quizás habrá mirado sin ver y
que yo en el curso de mi vida me vi obligado a observar en razón de mi
temperamento o de mis necesidades.
II
EN LOS UMBRALES DE LA VIDA
Mi nacimiento fue como el de tantos, un acontecimiento natural, de esos
que con abrumadora monotonía y constante regularidad se producen
diariamente en los ranchos de nuestras campañas desiertas.
Para mi padre, fui seguramente una boca más que alimentar, para mi
madre, una preocupación que se sumaba a las ocho iguales que ya tenía, y
para los perros de la casa y para los pajaritos del monte que nos
rodeaba, una promesa segura de cascotazos y mortificaciones que
comenzaría a cumplirse dentro de los tres años de la fecha y dur

In [None]:
# crear vocabularios
chars = tuple(set(text))

print (f"Longitud del vocabulario:\t{len(chars)}")
print (f"Vocabulario:\t{chars}")

itos = dict(enumerate(chars))
stoi = {ch: ii for ii, ch in itos.items()}

print (f"\nLongitud stoi:\t{len(stoi)}")
print (f"Longitud itos:\t{len(itos)}")

Longitud del vocabulario:	95
Vocabulario:	('o', '0', 'Y', ',', '«', 'k', '4', 'x', 'á', 'R', '&', 'f', '_', '*', '"', 'M', 'ú', 's', 'T', 'v', 'Á', '?', 'L', 'B', 'Ñ', 'c', 'P', 'a', 'W', 'N', '[', ' ', '\n', 'É', '¡', '6', ')', 'h', 'Q', 'Ó', 'J', 'ñ', 'U', 'º', 'S', 'l', '3', 't', '¿', 'ü', '(', 'D', ';', 'I', 'Z', 'm', 'V', '5', 'F', 'd', 'O', 'ó', 'w', '9', '.', 'z', '2', 'p', 'X', 'r', 'n', 'H', 'i', 'E', 'g', 'é', ':', 'ª', 'C', '1', 'u', '!', '7', 'q', 'A', 'b', '»', 'y', 'G', 'e', '8', 'í', 'j', ']', '-')

Longitud stoi:	95
Longitud itos:	95


In [None]:
# Codificar el texto, transformar los caracteres a sus índices
encoded_text = np.array([stoi[ch] for ch in text])

print (f"Longitud del texto codificado: {len(encoded_text):,}")

Longitud del texto codificado: 126,820


In [None]:
# función para hacer el one-hot-encode
def oneHotEncode(arr, n_labels):
    
    # inicializar la matriz en ceros
    one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
    
    # rellenar los elementos apropiados con unos
    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    
    # finalmente hacer reshaping para volver a la matriz original
    one_hot = one_hot.reshape((*arr.shape, n_labels))
    
    return one_hot

In [None]:
# verificar la función
sequence = encoded_text[15:101] 
print (f"secuencia codificada:\t{sequence}\n")

words = ''.join([itos[item] for item in sequence])
print (f"secuencia decodificada:\t{words}")

one_hot = oneHotEncode(sequence, len(chars))

tmp_chars = tuple(set(words))
print (f"\ntmp_chars:\t{tmp_chars}\tLongitud:\t{len(tmp_chars)}")
print (f"\nLongitud de caracteres en la secuencia:\t{len(words)}")

tmp = one_hot.tolist()

print (f"\none_hot shape:\t{one_hot.shape}\n")

for i, item in enumerate(one_hot.tolist()):
  if i==4: break
  print (f"{i}\t{item}")

print (f"\n' ':{stoi[' ']}\t'a':{stoi['a']}")

secuencia codificada:	[29  0 31 27 85 69 72 74  0 31 45 27 31 89 17 67 89 69 27 70 65 27 31 59
 89 31 83 80 89 31 55 72 17 31 69 89 25 80 89 69 59  0 17 31 45 45 89 74
 80 89 70 31 27 31 25  0 70 17 47 72 47 80 72 69 31 80 70 32 45 72 85 69
  0 31 72 70 47 89 69 89 17 27 70 47 89 52]

secuencia decodificada:	No abrigo la esperanza de que mis recuerdos lleguen a constituir un
libro interesante;

tmp_chars:	('o', 'r', 't', 'n', 'i', 'c', 'g', 'a', 'N', 'u', ' ', '\n', 'q', 'b', ';', 'm', 'd', 'e', 's', 'z', 'p', 'l')	Longitud:	22

Longitud de caracteres en la secuencia:	86

one_hot shape:	(86, 95)

0	[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0


En la siguiente Figura se ilustra el objetivo al que se quiere llegar en este notebook. 

![Forward](https://drive.google.com/uc?id=1NIo-6yufX5WLHziaAKmyunl0SXpvgdci)

Se desea predecir el siguiente carácter dado que se tiene el anterior.

## Definir los mini-batches

La división del conjunto de entrenamiento y validación se realiza antes de obtener los mini batches. Una vez hecho esto.

Se calcula cúal será el tamaño total del mini batch (batch size * sequence length)

Después se puede conocer cuantos mini batches se pueden obtener dada la longitud de la matriz de entrada entre el tamaño total del mini batch (matrix input length // batch size total)

Una vez que se definió el tamaño del conjunto de entrenamiento, se obtienen los valores para las "etiquetas" y:

Dado un mini batch de 10 (batches) x 10 (longitud de la secuencia)

<pre>
x:[
 [64 57 61 16 54 71 94 15 26 15]
 [28 73 28 71 43 17 14 73 28 73]
 [19 43 17 14  0 71 28  4 71 19]
 [28 19 14 28 71 28 37 73 43 71]
 [44 37 43 29 57  6 17 28 71 14]
 [88 28 42 71 19 43 71 17 38 19]
 [ 4 71 20 37 17 71 75  6 43 71]
 [71 44 43 71 28  4 78  6 37 43]
 [18 28 71  6 17 28 71 30 28 27]
 [38 44 28 71 73 43 28 14 28 44]
]

x.shape: (10, 10
</pre>

Obtener los valores para y. Como se pretende predecir los valores (caracteres) siguientes y será formado de la siguiente manera:

<pre>
y:[
 [57 61 16 54 71 94 15 26 15 32]
 [73 28 71 43 17 14 73 28 73 71]
 [43 17 14  0 71 28  4 71 19 43]
 [19 14 28 71 28 37 73 43 71 44]
 [37 43 29 57  6 17 28 71 14 28]
 [28 42 71 19 43 71 17 38 19 71]
 [71 20 37 17 71 75  6 43 71  4]
 [44 43 71 28  4 78  6 37 43 17]
 [28 71  6 17 28 71 30 28 27 28]
 [44 28 71 73 43 28 14 28 44 28]
]

y.shape: (10, 10)
</pre>


In [None]:
# definir una función para obtener las secuencias
def get_batches(arr, batch_size, seq_length):
    """crea un generador que devuelve lotes de tamaño batch_size x seq_length de una matriz dada
    arr: matriz de la que desea hacer lotes
    batch_size: tamaño del lote, el número de secuencias por lote
    seq_length: número de caracteres codificados en una secuencia
    """
    
    # obtener el tamaño total del batch
    batch_size_total = batch_size * seq_length

    # número total de lotes que podemos hacer
    n_batches = len(arr)//batch_size_total
    
    # solo mantener los caracteres suficientes para hacer lotes completos
    arr = arr[:n_batches * batch_size_total]
    
    # reshaping en filas del tamaño de lote
    arr = arr.reshape((batch_size, -1))
    
    # iterar a través de la matriz, una secuencia a la vez
    for n in range(0, arr.shape[1], seq_length):
        # las características (entrada)
        x = arr[:, n:n+seq_length]

        # los objetivos, DESPLAZADOS POR UNO
        y = np.zeros_like(x)
        try:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
        except IndexError:
            y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
        yield x, y

In [None]:
# # crear el conjunto de entrenamiento y validación

# val_frac = 0.1
# val_idx = int( len(encoded_text) * (1-val_frac) )
# print (f"encoded_text:\t{len(encoded_text):,}") # 100%
# print (f"val_idx:\t{val_idx:,}") # 10 %

# train_data, val_data = encoded_text[:val_idx], encoded_text[val_idx:]

# for x, y in get_batches(val_data, batch_size=10, seq_length=10):
#   print (x.shape, y.shape)
#   print (x.tolist())
#   print (y.tolist())
#   break

# res = get_batches(train_data, batch_size=10, seq_length=10)
# print (f"\n{res}")

In [None]:
# verificar si tenemos cudas disponibles
train_on_gpu = torch.cuda.is_available()
if(train_on_gpu):
    print('Se puede entrenar con GPU!')
else: 
    print('No hay GPU disponible, entrenar en CPU')

No hay GPU disponible, entrenar en CPU


## Definir el modelo de la LSTM

In [None]:
class LSTMtextGeneration(nn.Module):
    
    def __init__(self, tokens, n_hidden=256, n_layers=2, drop_prob=0.5, lr=0.001):
        super().__init__()

        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        
        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        # capa lstm
        self.lstm=nn.LSTM(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True)
        
        # capa dropout
        self.dropout=nn.Dropout(drop_prob)
        
        # capa totalmente conectada FC
        self.fc=nn.Linear(n_hidden,len(self.chars))
    
    def forward(self, x, hidden):
        # obtenger la salida y el nuevo estado oculto del lstm
        # x --> [10, 10, 95]
        r_output, hidden = self.lstm(x, hidden) # r_output --> [10, 10, 512]
        
        # pasar la salida por la capa Dropout
        out = self.dropout(r_output) # out --> [10, 10, 512]
      

        # hacer reshaping
        out = out.contiguous().view(-1, self.n_hidden) # out --> [100, 512]
        
        # pasar la salida a la capa FC
        out = self.fc(out)

        return out, hidden
    
    
    def init_hidden(self, batch_size):
        """ Inicializar el estado oculto """
        
        # crear dos nuevos tensores con tamaños n_layers x batch_size x n_hidden
        weight = next(self.parameters()).data
        
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        else:
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        
        return hidden

### Funcion para el entrenamiento


In [None]:
def train(model, data, epochs=10, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    """ 
    model: instancia de la red LSTMtextGeneration
    data: datos de texto para entrenar la red
    epochs: número de épocas a entrenar
    batch_size: número de minisecuencias por minilote, también conocido como tamaño del lote
    seq_length: número de secuencias de caracteres por mini-lote
    lr: taza de aprendizaje
    clip: recorte de degradado
    val_frac: fracción de datos que se espera para la validación
    print_every: número de pasos para la pérdida de entrenamiento y validación de impresión
    """

    model.train()
    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    
    # crear datos de entrenamiento y validación
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]

    # print (f"conjunto de entrenamiento: {len(data):,}\t{data.shape}")
    # print (f"conjunto de validación: {len(val_data):,}\t{val_data.shape}")
        
    if(train_on_gpu):
        model.cuda()
    
    counter = 0
    n_chars = len(model.chars)
    loss_epoch_train, loss_epoch_val = [], []

    for e in range(epochs):
        # inicializar estado oculto
        h = model.init_hidden(batch_size)
        
        # obtener los mini batches del conjunto de entrenamiento y validación
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            
            # pasar vectores a one-hot encode
            x = oneHotEncode(x, n_chars)

            # convertir los datos a tensores
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
            
            # si hay cuda disponible definir los datos así
            if (train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda()
            
            # crear nuevas variables para el estado oculto, de lo contrario, se retrodecería a través de todo el historial de entrenamiento
            h = tuple([each.data for each in h]) # n_layers --> 2, [2, 10, 512]

            # resetear gradientes acumulados cero
            model.zero_grad()
            
            # obtener la salida del modelo
            output, h = model(inputs, h)
            
            # calcular la pérdida
            loss = criterion(output, targets.view(batch_size*seq_length).long())

            # retropropagación 
            loss.backward()
            
            # `clip_grad_norm` ayuda a prevenir el problema del gradiente explosivo en RNN / LSTM
            nn.utils.clip_grad_norm_(model.parameters(), clip)
            
            # actualizar los pesos
            optimizer.step()
            
            # estadísticas de las perdidas
            if counter % print_every == 0: # verificar cada cuanto se imprimie la información
                
                # obtener la pérdida del conjunto de validación
                val_h = model.init_hidden(batch_size) # obtener estados ocultos

                val_losses = [] # almacenar perdidas
                
                model.eval()

                # no utilizar cálculos del gradiente
                with torch.no_grad():
                
                    # iterar con el conjunto de validación
                    for x, y in get_batches(val_data, batch_size, seq_length):

                        # obtener datos con one-hot encode
                        x = oneHotEncode(x, n_chars)

                        # transformar datos a tensores
                        x, y = torch.from_numpy(x), torch.from_numpy(y)
                        
                        # crear nuevas variables para el estado oculto, de lo contrario, se retrodecería a través de todo el historial de entrenamiento
                        val_h = tuple([each.data for each in val_h])
                        
                        inputs, targets = x, y

                        # si hay cuda disponible definir los datos así
                        if(train_on_gpu): inputs, targets = inputs.cuda(), targets.cuda()

                        # obtener la salida del modelo
                        output, val_h = model(inputs, val_h)
                        
                        # calcular la pérdida
                        val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                    
                        # almacenar las pérdidas
                        val_losses.append(val_loss.item())
                
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))

**Nota**: Se deben de crear nuevas variables para el estado oculto `h`, de lo contrario, se retrodecería a través de todo el historial de entrenamiento.

El problema se encuentra en el ciclo de entrenamiento: `backward()`  está tratando de propagarse hacia atrás hasta el comienzo del tiempo, lo que funciona para el primer lote pero no para el segundo porque el grafo del primer lote se ha descartado. **esto genera un error**.<br>
<br>




### Realizar entrenamiento

In [None]:
n_hidden = 512
n_layers = 2
model = LSTMtextGeneration(chars, n_hidden, n_layers)
print(model)
batch_size = 256
seq_length = 10
n_epochs =  10

# entrenar el modelo
train(model, encoded_text, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)

LSTMtextGeneration(
  (lstm): LSTM(95, 512, num_layers=2, batch_first=True, dropout=0.5)
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=512, out_features=95, bias=True)
)
Epoch: 1/10... Step: 10... Loss: 3.2262... Val Loss: 3.2620
Epoch: 1/10... Step: 20... Loss: 3.1583... Val Loss: 3.1877
Epoch: 1/10... Step: 30... Loss: 3.1047... Val Loss: 3.1656
Epoch: 1/10... Step: 40... Loss: 3.0880... Val Loss: 3.1580
Epoch: 2/10... Step: 50... Loss: 3.0999... Val Loss: 3.1523
Epoch: 2/10... Step: 60... Loss: 3.0808... Val Loss: 3.1464
Epoch: 2/10... Step: 70... Loss: 3.1188... Val Loss: 3.1405
Epoch: 2/10... Step: 80... Loss: 3.0448... Val Loss: 3.1303
Epoch: 3/10... Step: 90... Loss: 3.1424... Val Loss: 3.1058
Epoch: 3/10... Step: 100... Loss: 3.0486... Val Loss: 3.0530
Epoch: 3/10... Step: 110... Loss: 2.9201... Val Loss: 3.0018
Epoch: 3/10... Step: 120... Loss: 2.8110... Val Loss: 2.9102
Epoch: 3/10... Step: 130... Loss: 2.7563... Val Loss: 2.8687
Epoch: 4/10... Step: 1

### Definir función para predecir


In [None]:
def predict(model, char, h=None, top_k=None):
      """
      dado un carácter, predecir el siguiente carácter.
      devuelve el carácter predicho y el estado oculto
      """
      
      # codificar el carácter a su índice
      x = np.array([[model.char2int[char]]])

      # usar one-hot encode para el carácter
      x = oneHotEncode(x, len(model.chars))

      # convertir los datos a tensores
      inputs = torch.from_numpy(x)
      
      # si hay cuda disponible definir los datos así
      if(train_on_gpu): inputs = inputs.cuda()
      
      # separar el estado oculto de los estados h. (detach)
      h = tuple([each.data for each in h])

      # obtener la salida del modelo
      out, h = model(inputs, h)

      # obtener las probabilidades del carácter
      p = F.softmax(out, dim=1).data

      # calcular los datos en modo cpu
      if(train_on_gpu): p = p.cpu()
      
      # obtener los mejores caracteres predichos
      if top_k is None:
          top_ch = np.arange(len(model.chars))
      else:
          p, top_ch = p.topk(top_k)
          top_ch = top_ch.numpy().squeeze()
      
      # seleccionar el próximo carácter probable con algún elemento de aleatoriedad
      p = p.numpy().squeeze()
      char = np.random.choice(top_ch, p=p/p.sum())
      
      # devolver el valor codificado del carácter predicho y el estado oculto
      return model.int2char[char], h

### Definir funcion para generar texto

In [None]:
def sample(model, size, prime='el', top_k=None):
    """
    generar texto a partir de una semilla
    """

    if(train_on_gpu): model.cuda()
    else: model.cpu()
    
    model.eval() # evaluar el modelo
    
    # obtener los caracteres de la semilla
    chars = [ch for ch in prime]

    # inicializar el estado oculto
    h = model.init_hidden(1)
    
    # predecir los siguientes caracteres
    for ch in prime:
        char, h = predict(model, ch, h, top_k=top_k)
    
    # almacenar caracteres predichos
    chars.append(char)
    
    # pasar el carácter anterior y obtén uno nuevo
    for ii in range(size):
        char, h = predict(model, chars[-1], h, top_k=top_k)
        chars.append(char)

    return ''.join(chars)

### Generar Texto

In [None]:
print(sample(model, 200, prime='hoy', top_k=2))

hoy entran en el cuento en el prentera de la menteran de la conta en el pale cono el preciaran en las prentos y lo conos del pontera dela caros, en el prente en lo comoses, y los
conos los companto en el 


In [None]:
print(sample(model, 200, prime='hoy', top_k=3))

hoy y a se pora de curate de el
como espera la compenteron en
los comistan lo mino de los cuarto, de el minterones, estero el compante el mariante en el misición delos diente las parento de la malerante, 


In [None]:
print(sample(model, 200, prime='a menudo', top_k=2))

a menudo de lo contaran en la minto en las cara en la caro de estre en la minos del como del camo pare de el como de esteronto,
el el compara las pare lo compre en en las caro el como en la misira del cono en 


In [None]:
print(sample(model, 200, prime='noche', top_k=3))

noches a cuela en lo
menco entras,
pare el mistan en las cuarantes, es con el miesidar a por ala para la cuancon es corronen en
pantentena en el canes cara el mesoro do las promesta a la puento, y al cuesta


## Ejercicio

*   Utilizar un conjunto de datos diferente (libro en español) pude ser del proyecto Gutenberg
*   Una opción puede ser fusionar el nuevo conjunto de datos con el utilizado en este notebook.
