In [None]:
import random
import torch
import gc
from typing import Dict, List, Tuple
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

class LSTM_PreciosBitcoin(torch.nn.Module):
    """Red LSTM para predicción de precios de Bitcoin."""

    def __init__(
            self,
            entrada: int,
            oculto: int,
            capas: int,
            dropout: float
        ) -> None:
        super(LSTM_PreciosBitcoin, self).__init__()
        self.lstm = torch.nn.LSTM(
            input_size=entrada,
            hidden_size=oculto,
            num_layers=capas,
            dropout=dropout,
            batch_first=True
        )
        self.fc = torch.nn.Linear(oculto, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :])

def crear_secuencias(
        datos: np.ndarray,
        longitud_secuencia: int
    ) -> Tuple[np.ndarray, np.ndarray]:
    """Prepara secuencias deslizantes para entrenamiento LSTM."""
    X, y = [], []
    for i in range(len(datos) - longitud_secuencia):
        X.append(datos[i:i+longitud_secuencia])
        y.append(datos[i+longitud_secuencia])
    return np.array(X), np.array(y)

def dividir_conjuntos(
        X: np.ndarray,
        y: np.ndarray,
        prueba: float = 0.2,
        mezclar: bool = False
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Divide los datos en conjuntos de entrenamiento y validación."""
    return train_test_split(X, y, test_size=prueba, shuffle=mezclar)

def entrenar_modelo(
        modelo: torch.nn.Module,
        X_ent: torch.Tensor,
        y_ent: torch.Tensor,
        X_val: torch.Tensor,
        y_val: torch.Tensor,
        epocas: int = 2,
        lr: float = 0.001
    ) -> float:
    """Entrena el modelo LSTM y evalúa en conjunto de validación."""
    criterio = torch.nn.MSELoss()
    optimizador = torch.optim.Adam(modelo.parameters(), lr=lr)

    for _ in range(epocas):
        modelo.train()
        optimizador.zero_grad()
        salida = modelo(X_ent)
        perdida = criterio(salida, y_ent)
        perdida.backward()
        optimizador.step()

    modelo.eval()
    with torch.no_grad():
        preds = modelo(X_val)
        perdida_val = criterio(preds, y_val).item()
    return perdida_val

def algoritmo_genetico_lstm(
        serie_temporal: torch.Tensor,
        tamano_poblacion: int = 4,
        generaciones: int = 2,
        max_muestras: int = 100_000
    ) -> Dict[str, float]:
    """Implementa un algoritmo genético para optimizar hiperparámetros de una red LSTM.

    Parámetros:
        serie_temporal (torch.Tensor): Serie temporal de entrada para entrenamiento
        tamano_poblacion (int): Número de individuos por generación (default 4)
        generaciones (int): Número de iteraciones evolutivas (default 2)
        max_muestras (int): Límite de muestras para evitar saturación de RAM (default 100,000)

    Retorna:
        Dict[str, float]: Mejor conjunto de hiperparámetros encontrado
    """

    def crear_individuo() -> Dict[str, float]:
        """Genera un individuo con hiperparámetros aleatorios."""
        individuo = {
            'longitud_secuencia': random.randint(10, 50),
            'tamano_capa_oculta': random.choice([16, 32, 64, 128]),
            'num_capas': random.choice([1, 2, 3]),
            'dropout': random.uniform(0.0, 0.5),
            'tasa_aprendizaje': random.uniform(0.0005, 0.01)
        }
        if individuo['num_capas'] == 1:
            individuo['dropout'] = 0.0
        return individuo

    def evaluar_aptitud(individuo: Dict[str, float]) -> Tuple[float, Dict[str, float]]:
        """Evalúa un individuo mediante el entrenamiento del modelo LSTM."""
        try:
            # Preparación de datos
            muestra = serie_temporal[:max_muestras]
            X, y = crear_secuencias(muestra, individuo['longitud_secuencia'])

            if len(X) < 500:
                return float('inf'), individuo  # Penaliza secuencias muy cortas

            # División train/validation
            X_ent, X_val, y_ent, y_val = dividir_conjuntos(X, y, prueba=0.2, mezclar=False)

            # Conversión a tensores
            X_ent = torch.tensor(X_ent, dtype=torch.float32).unsqueeze(-1)
            y_ent = torch.tensor(y_ent, dtype=torch.float32).view(-1, 1)
            X_val = torch.tensor(X_val, dtype=torch.float32).unsqueeze(-1)
            y_val = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)

            # Entrenamiento del modelo
            modelo = LSTM_PreciosBitcoin(
                entrada=1,
                oculto=individuo['tamano_capa_oculta'],
                capas=individuo['num_capas'],
                dropout=individuo['dropout']
            )

            perdida = entrenar_modelo(
                modelo=modelo,
                X_ent=X_ent,
                y_ent=y_ent,
                X_val=X_val,
                y_val=y_val,
                epocas=2,
                lr=individuo['tasa_aprendizaje']
            )

            return perdida, individuo

        except Exception as e:
            print(f"⚠️ Error en evaluación: {str(e)}")
            return float('inf'), individuo

        finally:
            # Liberación de memoria
            if 'modelo' in locals():
                del modelo
            if 'X_ent' in locals():
                del X_ent, y_ent, X_val, y_val
            torch.cuda.empty_cache()
            gc.collect()

    def cruzar_individuos(
            padre1: Dict[str, float],
            padre2: Dict[str, float]
        ) -> Dict[str, float]:
        """Realiza cruce y mutación para crear un nuevo individuo."""
        hijo = {
            'longitud_secuencia': random.choice([padre1['longitud_secuencia'], padre2['longitud_secuencia']]),
            'tamano_capa_oculta': random.choice([padre1['tamano_capa_oculta'], padre2['tamano_capa_oculta']]),
            'num_capas': random.choice([padre1['num_capas'], padre2['num_capas']]),
            'dropout': max(0.0, min(0.5, (padre1['dropout'] + padre2['dropout']) / 2 + random.uniform(-0.05, 0.05))),
            'tasa_aprendizaje': max(0.0001, min(0.01, (padre1['tasa_aprendizaje'] + padre2['tasa_aprendizaje']) / 2 + random.uniform(-0.001, 0.001)))
        }

        if hijo['num_capas'] == 1:
            hijo['dropout'] = 0.0

        return hijo

    # Inicialización de población
    poblacion = [crear_individuo() for _ in range(tamano_poblacion)]

    # Ciclo evolutivo
    for generacion in range(1, generaciones + 1):
        # Evaluación de aptitud
        resultados = []
        for individuo in poblacion:
            perdida, ind = evaluar_aptitud(individuo)
            if perdida < float('inf'):  # Solo considerar individuos válidos
                resultados.append((perdida, ind))

        # Validación de resultados
        if len(resultados) < 2:
            print("⚠️ Advertencia: Insuficientes individuos válidos")
            break

        # Selección de mejores
        resultados.sort(key=lambda x: x[0])
        mejores = resultados[:max(2, tamano_poblacion // 2)]
        nueva_poblacion = [ind for _, ind in mejores]

        # Reproducción
        while len(nueva_poblacion) < tamano_poblacion:
            if len(nueva_poblacion) >= 2:
                padre1, padre2 = random.sample(nueva_poblacion, 2)
            else:
                padre1 = padre2 = nueva_poblacion[0]

            nuevo_individuo = cruzar_individuos(padre1, padre2)
            nueva_poblacion.append(nuevo_individuo)

        poblacion = nueva_poblacion
        print(f"🧬 Generación {generacion}/{generaciones} | Mejor pérdida: {mejores[0][0]:.6f}")

    return resultados[0][1] if resultados else crear_individuo()

def cargar_datos(ruta: str) -> torch.Tensor:
    """Carga y preprocesa los datos de precios de Bitcoin."""
    df = pd.read_csv(ruta)
    df['Date'] = pd.to_datetime(df['Timestamp'], unit='s')
    df = df.set_index('Date')[['Close']].copy()
    df.dropna(inplace=True)

    escalador = MinMaxScaler()
    cierre_escalado = escalador.fit_transform(df[['Close']].values).flatten()

    return torch.tensor(cierre_escalado, dtype=torch.float32)

def entrenar_modelo_final(
        serie_temporal: torch.Tensor,
        mejores_hiperparametros: Dict[str, float],
        epocas: int = 20
    ) -> LSTM_PreciosBitcoin:
    """Entrena el modelo final con los mejores hiperparámetros encontrados."""
    longitud_secuencia = mejores_hiperparametros.get('longitud_secuencia', 20)
    X, y = crear_secuencias(serie_temporal.numpy(), longitud_secuencia)

    X_ent, X_val, y_ent, y_val = dividir_conjuntos(X, y, prueba=0.2, mezclar=False)

    X_ent = torch.tensor(X_ent, dtype=torch.float32).unsqueeze(-1)
    y_ent = torch.tensor(y_ent, dtype=torch.float32).view(-1, 1)
    X_val = torch.tensor(X_val, dtype=torch.float32).unsqueeze(-1)
    y_val = torch.tensor(y_val, dtype=torch.float32).view(-1, 1)

    modelo = LSTM_PreciosBitcoin(
        entrada=1,
        oculto=mejores_hiperparametros.get('tamano_capa_oculta', 32),
        capas=mejores_hiperparametros.get('num_capas', 3),
        dropout=mejores_hiperparametros.get('dropout', 0.265)
    )

    entrenar_modelo(
        modelo=modelo,
        X_ent=X_ent,
        y_ent=y_ent,
        X_val=X_val,
        y_val=y_val,
        epocas=epocas,
        lr=mejores_hiperparametros.get('tasa_aprendizaje', 0.0015)
    )

    return modelo

if __name__ == "__main__":
    # Carga y preparación de datos
    datos = cargar_datos("/content/btcusd_1-min_data.csv")

    # Optimización de hiperparámetros
    mejores_hiperparametros = algoritmo_genetico_lstm(
        serie_temporal=datos,
        tamano_poblacion=4,
        generaciones=2
    )
    print("🔍 Mejores hiperparámetros encontrados:", mejores_hiperparametros)

    # Entrenamiento del modelo final
    modelo_final = entrenar_modelo_final(datos, mejores_hiperparametros)
    print("✅ Modelo final entrenado exitosamente")