<a href="https://colab.research.google.com/github/mcstllns/DeepLearning_2025/blob/main/PRACTICA_06_RNN_y_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<font color="darkorange" size="10"><b>06. RNN y LSTM</b></font>

Miguel A. Castellanos

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset

Vamos a generar secuencias que provienen de un seno y vamos a intentar predecir el siguiente punto. No es una tarea complicada así que debería funcionar sin demasiada dimensionalidad.

En este ejemplo vamos a ilustrar las aquitecturas many to one, many to many soncronizado y many to many no sincronizado


<font color="darkorange" size="6"><b>Many to One</b></font>

In [None]:
# Generar datos senoidales con fase aleatoria
# La fase es importante para que estén desplazadas en x

# Si frequency es un numero genera todas las secuencias a esa frecuencia, si no
# Las frecuencias son aleatorias entre 0.5 y 3.0
def generate_sine_data(seq_length=10, num_samples=1000, frequency=None):
    sequences = []
    targets = []

    for i in range(num_samples):
        phase_shift = np.random.uniform(0, 2 * np.pi)  # Fase aleatoria
        current_frequency = frequency if frequency is not None else np.random.uniform(0.5, 3.0)  # Frecuencia fija o aleatoria

        x_values = np.linspace(0, seq_length, seq_length) * (2 * np.pi * current_frequency / seq_length)  # Ajuste de la frecuencia
        sine_wave = np.sin(x_values + phase_shift)

        sequences.append(sine_wave)
        targets.append(np.sin((seq_length * (2 * np.pi * current_frequency / seq_length)) + phase_shift))

    return np.array(sequences), np.array(targets)


# Crear dataset
seq_length = 100     # numero de elementos en la secuencia
num_samples = 1000  # numero de secuencias
X, y = generate_sine_data(seq_length, num_samples)

In [None]:
print(X.shape, y.shape)

In [None]:
# Generar una matriz de 4x4 plots eligiendo aleatoriamente las secuencias

fig, axes = plt.subplots(3, 3, figsize=(8, 8))
random_indices = np.random.randint(0, num_samples, size=(3, 3))

for i in range(3):
    for j in range(3):
        idx = random_indices[i, j]
        axes[i, j].plot(range(100), X[idx,])
        axes[i, j].scatter(100, y[idx,], color='orange')
        axes[i, j].set_title(f"Ejemplo {idx}")

plt.tight_layout()
plt.show()

In [None]:
# Creamos los tensores y el dataloader

X_tensor = torch.tensor(X, dtype=torch.float32).unsqueeze(-1)  # (samples, seq_length, 1)
y_tensor = torch.tensor(y, dtype=torch.float32).unsqueeze(-1)  # (samples, 1)

dataset = TensorDataset(X_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [None]:
# Definimos la red RNN
# Ojo, fijate que no usamos sequential, ahora tenemos que definir mejor el forward

# fijate que contiene el out, es una matriz de
# out: Este es el tensor de salida de la RNN. Dependiendo de la implementación de la RNN, out tiene la forma [batch_size, seq_len, hidden_size]:


# Definir la RNN
class RNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=1, num_layers=1):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -1, :])  # Tomar la última salida de la secuencia
        return out

El parámetro batch_first = true indica que el batch está contruido con la siguiente estructura [casos_del_batch, timestamps, variables_en_cada_timestamp]

Fíjate en la definición de la red anterior:

* input_size=1   # dimesiones de x
* hidden_size=50 # neuronas en la RNN
* num_layers=1   # numero de layers en stack


Al ser una RNN con más de una neurona hemos introducido una capa de integración Z, es donde pone

```
self.fc = nn.Linear(hidden_size, 1)
```

Para cada uno de los pasos la red va a propagar toda la activación de sus neuronas al siguiente paso, pero además, con esas activaciones va a realizar una integración Z que va a ser el pronóstico yt de la red en ese paso temporal

Como el objetivo es ***many to one*** lo que nos interesa es utilizar unicamente el último elemento de las predicciones de la RNN y entrenar la red con él.

La RNN hace todos los pronósticos yt pero nosotros solo utilizamnos el último para el entrenamiento

La sintaxis de python
```
out[:, -1, :]
```

es un modo avanzado de decirle que devuelva el último elemento de la segunda dimensión de la matriz out, justo con el que queremos entrenar

Si quisiéramos quedarnos con los último 5 elementos sería

```
out[:, -5:, :]
```

Y si quisiéramos quedarnos con los elementos de 2 al 5

```
out[:, 2:5, :]
```

**Ojo, esto controla lo que devuelve el forward del modelo, luego tienes que hacer coindir con la dimensinalidad de y para que pueda calcularse el Loss**

In [None]:
# Inicializar modelo, función de pérdida y optimizador
# Fijate que esto es igual que en MLP

model = RNN()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
# Entrenar la RNN
epochs = 20
for epoch in range(epochs):
    for batch_X, batch_y in dataloader:
        optimizer.zero_grad()
        predictions = model(batch_X)
        loss = loss_fn(predictions, batch_y)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

In [None]:
# Evaluación del modelo
# vamos a generar 100 nuevos datos que provienen del mismo generador y ver cuanto acertamos
model.eval()
with torch.no_grad():
    test_X,_ = generate_sine_data(seq_length, 100)  # Generar datos de prueba
    test_X_tensor = torch.tensor(test_X, dtype=torch.float32).unsqueeze(-1)
    test_y = model(test_X_tensor).squeeze().numpy()


In [None]:
# Generar una matriz de 3x3 plots eligiendo aleatoriamente las secuencias

fig, axes = plt.subplots(3, 3, figsize=(8, 8))
random_indices = np.random.randint(0, 100, size=(3, 3))

for i in range(3):
    for j in range(3):
        idx = random_indices[i, j]
        axes[i, j].plot(range(100), test_X[idx,])
        axes[i, j].scatter(100, test_y[idx,], color='orange')
        axes[i, j].set_title(f"Test {idx}")

plt.tight_layout()
plt.show()

Con una inspección visual es suficiente, si quieres añadir el cálculo del MSE o alguna otra métrica final puedes hacerlo

<font color="darkorange" size="6"><b>Many to Many sincronizado</b></font>

Usando el mismo generador de senos vamos a predecir todos los puntos de la secuencia.

In [None]:
# Definir la RNN
class RNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=1, num_layers=1):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, :, :])  # Devolvemos toda la secuencia
        return out

In [None]:
# Inicializar modelo, función de pérdida y optimizador
model = RNN()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
# Entrenar la RNN
epochs = 20
for epoch in range(epochs):
    for batch_X,_ in dataloader:
        optimizer.zero_grad()
        predictions = model(batch_X)
        loss = loss_fn(predictions, batch_X) # fijate que usamos como batch_y el propio batch_x
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

In [None]:
# Evaluación del modelo
model.eval()
with torch.no_grad():
    test_X, test_y = generate_sine_data(seq_length, 100)  # Generar datos de prueba
    test_X_tensor = torch.tensor(test_X, dtype=torch.float32).unsqueeze(-1)
    test_y = model(test_X_tensor).squeeze().numpy()


In [None]:
# Generar una matriz de 3x3 plots eligiendo aleatoriamente las secuencias

fig, axes = plt.subplots(3, 3, figsize=(8, 8))
random_indices = np.random.randint(0, 100, size=(3, 3))

for i in range(3):
    for j in range(3):
        idx = random_indices[i, j]
        axes[i, j].plot(range(100), test_X[idx,])
        axes[i, j].scatter(range(100), test_y[idx,], color='orange')
        axes[i, j].set_title(f"Test {idx}")

plt.tight_layout()
plt.show()

<font color="darkorange" size="6"><b>Many to Many No Sincronizado</b></font>

Usando el mismo generador de senos vamos a predecir solo una parte de los puntos de la secuencia.

In [None]:
# Definir la RNN
class RNN(nn.Module):
    def __init__(self, input_size=1, hidden_size=1, num_layers=1):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out[:, -5:, :])  # Tomar los últimos 5 puntos de la secuencia
        return out

In [None]:
# Inicializar modelo, función de pérdida y optimizador
model = RNN()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)


In [None]:
# Entrenar la RNN
epochs = 20
for epoch in range(epochs):
    for batch_X, _ in dataloader:
        optimizer.zero_grad()
        predictions = model(batch_X)
        loss = loss_fn(predictions, batch_X[:,-5:,:])
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

In [None]:
# Evaluación del modelo
model.eval()
with torch.no_grad():
    test_X, test_y = generate_sine_data(seq_length, 100)  # Generar datos de prueba
    test_X_tensor = torch.tensor(test_X, dtype=torch.float32).unsqueeze(-1)
    test_y = model(test_X_tensor).squeeze().numpy()


In [None]:
# Generar una matriz de 3x3 plots eligiendo aleatoriamente las secuencias

fig, axes = plt.subplots(3, 3, figsize=(8, 8))
random_indices = np.random.randint(0, 100, size=(3, 3))

for i in range(3):
    for j in range(3):
        idx = random_indices[i, j]
        axes[i, j].plot(range(100), test_X[idx,])
        axes[i, j].scatter(range(95,100), test_y[idx,], color='orange')
        axes[i, j].set_title(f"Test {idx}")

plt.tight_layout()
plt.show()

<font color="darkorange" size="6"><b>One to Many</b></font>

Para calcular un One to Many la forma más sencilla de implementarlo es repetir la entrada tantos pasos temporales como necesites, es decir, si quieres una salida de 10 pasos creas un vector de 10 veces el numero x repetido

Tienes dos estrategias para implementarlo:

- Creas tus X y batch de esa manera (la más sencilla)
- Haces que en el cálculo del forward se expanda el valor como un vector

```
def forward(self, x):
  # Primero necesitamos expandir la secuencia de entrada a la longitud deseada
  # La entrada x tiene forma [batch_size, 1, input_size] (una secuencia de 1 valor)
  x = x.repeat(1, self.seq_len, 1)  # Repetimos la entrada a lo largo de la longitud de secuencia
```


<font color="darkorange" size="6"><b>LSTM y GRU</b></font>

La implementación de las funciones LSTM y GRU es directa, simplemente sustituir la función RNN por estas funciones:

```
# Capa LSTM
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
```

Con el parámetro hidden_size se controla la dimensionalidad oculta, esto determina el tamaño de la celda oculta y por tanto también el número de neuronas necesarias para las puertas forget, input y output.

```
# Capa GRU
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
```

En la capa GRU la dimensionalidad oculta determina el número de neuronas pero no de la celda de memoria porque no existe.




<font color="darkorange" size="6"><b>Múltiples variables, layers apilados y bidireccional</b></font>

Esta explicación vale para las funciones de RNN, LSTM y GRU

Introducir múltiples variables es directo, simplemente tienes que construir tu matriz de datos haciendo que la tercera dimensión contenga las covariables. Es decir si vas a tener 100 casos, en 10 pasos de tiempo y 2 variables debes acabar con una matriz de dimensiones:


[100, 10, 2] es decir [casos_del_batch, timestamps, variables_en_cada_timestamp]


Para introducir layer apilados simplemente cambia el parámetro num_layers de 1 al valor que quieras, recuerda que eso puede incrementar significativamente el tiempo de ejecución


Para hacer que la red tenga un aprendizaje bidireccional simplemente se introduce en la función el parámetro bidirectional=True, por ejemplo:

```
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
```


<font color="darkorange" size="5"><b>Ejercicio 01</b></font>

El objetivo es ser capaces de "hackear" el generador aleatorio de python.

Fíjate en el siguiente código. La función va a generar semillas aleatorias y en función de esas semillas va a producir una secuencia aleatoria.

El objetivo es, conociendo la semilla ser capaz de producir la misma secuencia, si haces esto, aunque la "fórmula generadora" de python sea secreta tu la habrás "descubierto"

Tarea: Construye un modelo secuencial de la forma que sea para descubir la ecuación generadora de python



In [None]:
import numpy as np

def generate_random_vectors(num_vectors = 1000, num_seq = 10):
    sequences = []
    targets = []

    # Generar vectores de 10 números aleatorios con semillas diferentes
    seeds = []
    vectors = []

    for i in range(num_vectors):
        # Generar una semilla aleatoria para cada vector
        seed = np.random.randint(0, 2**32)
        seeds.append(seed)
        np.random.seed(seed)  # Establecer la semilla para la generación de números aleatorios
        vector = np.random.rand(num_seq)  # Generar el vector de 10 números aleatorios
        vectors.append(vector)  # Almacenar la semilla y el vector
    return seeds, vectors


X, y = generate_random_vectors()



<font color="darkorange" size="5"><b>Ejercicio 02</b></font>

Usando redes RNN y LSTM pon un ejemplo en el que se compruebe como en las RNN los estados inciales dejan de influir en la predicción final si la secuencia es larga y como eso se resuelve usando una LSTM