# Exploraci√≥n: DQN para Control de Tanque

Este notebook permite explorar y probar el agente DQN de forma interactiva antes del entrenamiento completo.

## Objetivos:
- Verificar que el simulador funciona correctamente
- Probar agentes en un entorno sencillo
- Ajustar hiperpar√°metros interactivamente
- Visualizar resultados inmediatamente


## Imports y Setup

In [11]:
import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
from IPython.display import clear_output

# Agregar paths para importar m√≥dulos del proyecto
sys.path.append('../..')

from simulations.tanque_nivel.tanque_simulator import TankLevelSimulator
from agent_valeria.DQN.dqn_agent import DQN_Agent
from environment.universal_pid_env import UniversalPIDControlEnv

# Configurar matplotlib para notebooks
%matplotlib inline
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 8)

print("Imports completados")

Imports completados


In [12]:
## Crear Entorno y Agente DQN

# Crear simulador h√≠brido de tanque
tank = TankLevelSimulator(
    tank_area=2.0,           # m¬≤ - √Årea del tanque
    max_height=5.0,          # m - Altura m√°xima
    max_inflow=10.0,         # L/s - Caudal m√°ximo
    outflow_coeff=2.0,       # Coeficiente de salida
    dt=1.0,                  # s - Paso de tiempo
    noise_level=0.02,        # Ruido en medici√≥n
    dead_band=0.1,           # m - Banda muerta (del Universal)
    max_episode_steps=200    # L√≠mite de episodio (del Universal)
)
print("Simulador h√≠brido de tanque creado")
print(f"F√≠sica real del tanque + funcionalidades universales")

TypeError: TankLevelSimulator.__init__() got an unexpected keyword argument 'dead_band'

In [None]:
# Crear agente DQN
agent = DQN_Agent(
    state_dim=6,           # [level, setpoint, error, prev_error, integral, derivative]
    lr=0.001,             # Learning rate
    gamma=0.99,           # Discount factor
    epsilon_start=1.0,    # Exploraci√≥n inicial
    epsilon_end=0.01,     # Exploraci√≥n final
    epsilon_decay=0.995,  # Decaimiento
    memory_size=1000,     # Buffer peque√±o para pruebas
    batch_size=32,
    target_update_freq=50, # Actualizar target m√°s frecuente
    device='cpu'
)

print("Agente DQN creado")
print(f"  Acciones discretas: {agent.n_actions}")
print(f"  Epsilon inicial: {agent.get_epsilon():.3f}")
print(f"  Memoria disponible: {len(agent.memory)}")

Espacio de acciones PID creado:
  Kp: [ 0.1  3.4  6.7 10. ]
  Ki: [0.01       1.67333333 3.33666667 5.        ]
  Kd: [1.00000000e-03 6.67333333e-01 1.33366667e+00 2.00000000e+00]
  Total acciones: 64
üîÑ Red objetivo actualizada (step 0)
 Agente DQN creado:
   Estados: 6
   Acciones: 64
   Learning rate: 0.001
   Gamma: 0.99
Agente DQN creado
  Acciones discretas: 64
  Epsilon inicial: 1.000
  Memoria disponible: 0


In [None]:
# Probar selecci√≥n de acciones sin entrenar
print(" Probando selecci√≥n de acciones (agente no entrenado)...")

# Estado de prueba
test_state = np.array([2.0, 3.0, 1.0, 0.8, 5.0, 0.2])  # [level, setpoint, error, prev_error, integral, derivative]
print(f"Estado de prueba: {test_state}")

# Probar varias acciones
for i in range(5):
    pid_params = agent.select_action(test_state, training=True)
    action_idx = agent.get_last_action_index()
    
    print(f"  Acci√≥n {i+1}: √çndice={action_idx:2d}, PID=[{pid_params[0]:.2f}, {pid_params[1]:.2f}, {pid_params[2]:.2f}]")

print(f"\nüé≤ Epsilon actual: {agent.get_epsilon():.3f} (alta exploraci√≥n = acciones aleatorias)")

 Probando selecci√≥n de acciones (agente no entrenado)...
Estado de prueba: [2.  3.  1.  0.8 5.  0.2]
  Acci√≥n 1: √çndice= 9, PID=[0.10, 3.34, 0.67]
  Acci√≥n 2: √çndice=25, PID=[3.40, 3.34, 0.67]
  Acci√≥n 3: √çndice= 5, PID=[0.10, 1.67, 0.67]
  Acci√≥n 4: √çndice=19, PID=[3.40, 0.01, 2.00]
  Acci√≥n 5: √çndice= 6, PID=[0.10, 1.67, 1.33]

üé≤ Epsilon actual: 1.000 (alta exploraci√≥n = acciones aleatorias)


In [None]:
def train_episode(agent, tank, max_steps=50):
    """
    Entrenar un episodio con simulador h√≠brido
    """
    obs, info = tank.reset()
    total_reward = 0
    
    for step in range(max_steps):
        # Acci√≥n PID del agente
        action = agent.select_action(obs, training=True)
        next_obs, reward, done, truncated, info = tank.step(action)
        
        agent.store_experience(obs, action, reward, next_obs, done)
        agent.update()
        
        total_reward += reward
        obs = next_obs
        
        if done or truncated:
            break
    
    return total_reward

print("Funci√≥n de entrenamiento h√≠brida lista")

In [None]:
# Entrenar episodio de prueba
print("Entrenando episodio de prueba con simulador h√≠brido...")
print(f"Memoria inicial: {len(agent.memory)} experiencias")
print(f"Epsilon inicial: {agent.get_epsilon():.3f}")

total_reward = train_episode(agent, tank, max_steps=50)

print(f"Episodio completado:")
print(f"  Recompensa total: {total_reward:.2f}")
print(f"  Memoria final: {len(agent.memory)} experiencias")
print(f"  Epsilon final: {agent.get_epsilon():.3f}")
print(f"  F√≠sica del tanque: √Årea={tank.tank_area}m¬≤, Altura={tank.max_height}m")

In [None]:
# Esta celda ser√° reemplazada por la celda 11 que usa el tanque h√≠brido

print("Usar la celda siguiente que entrena con simulador h√≠brido...")

In [None]:
# Esta celda ser√° reemplazada por la celda 12 que visualiza el h√≠brido

print("Usar la celda siguiente que visualiza resultados del tanque h√≠brido...")

In [None]:
# Entrenar m√∫ltiples episodios con tanque h√≠brido
NUM_EPISODES = 10

print(f"Entrenando {NUM_EPISODES} episodios con simulador h√≠brido...")
print(f"Cada episodio simula f√≠sica real del tanque")
print(f"Ecuaci√≥n: A * dh/dt = Qin - Qout (donde Qout = C * ‚àöh)")

results = []
for episode in range(NUM_EPISODES):
    total_reward = train_episode(agent, tank, max_steps=30)
    
    tank_info = tank.get_tank_info()
    
    results.append({
        'episode': episode + 1,
        'total_reward': total_reward,
        'epsilon': agent.get_epsilon(),
        'memory_size': len(agent.memory),
        'final_level': tank_info['level'],
        'final_error': tank_info['error']
    })
    
    print(f"Episodio {episode+1}: Reward={total_reward:.2f}, "
          f"Nivel final={tank_info['level']:.2f}m, Error={tank_info['error']:.3f}m")

print("Entrenamiento completado")

In [None]:
# Visualizar resultados del entrenamiento h√≠brido
df_results = pd.DataFrame(results)

fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8))

# Recompensa por episodio
ax1.plot(df_results['episode'], df_results['total_reward'], 'bo-', linewidth=2)
ax1.set_title('Evoluci√≥n de la Recompensa')
ax1.set_xlabel('Episodio')
ax1.set_ylabel('Recompensa Total')
ax1.grid(True, alpha=0.3)

# Epsilon decay
ax2.plot(df_results['episode'], df_results['epsilon'], 'ro-', linewidth=2)
ax2.set_title('Decaimiento de Epsilon')
ax2.set_xlabel('Episodio')
ax2.set_ylabel('Epsilon')
ax2.grid(True, alpha=0.3)

# Nivel final por episodio
ax3.plot(df_results['episode'], df_results['final_level'], 'go-', linewidth=2)
ax3.axhline(y=tank.setpoint, color='r', linestyle='--', label='Setpoint')
ax3.set_title('Nivel Final por Episodio')
ax3.set_xlabel('Episodio')
ax3.set_ylabel('Nivel (m)')
ax3.legend()
ax3.grid(True, alpha=0.3)

# Error final por episodio
ax4.plot(df_results['episode'], np.abs(df_results['final_error']), 'mo-', linewidth=2)
ax4.axhline(y=tank.dead_band, color='g', linestyle='--', label='Banda muerta')
ax4.set_title('Error Final por Episodio')
ax4.set_xlabel('Episodio')
ax4.set_ylabel('|Error| (m)')
ax4.legend()
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Recompensa inicial: {df_results['total_reward'].iloc[0]:.2f}")
print(f"Recompensa final: {df_results['total_reward'].iloc[-1]:.2f}")
print(f"Error final promedio: {np.abs(df_results['final_error']).mean():.3f}m")
print(f"Banda muerta del tanque: {tank.dead_band}m")