# Implementación Entorno de Ambulancias con Algoritmo *Tuynman*

# CONFIGURACIÓN 

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import linprog, minimize
from collections import defaultdict
import time
from itertools import product

# Para reproducibilidad
np.random.seed(42)

# Configuración de visualización
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize'] = (14, 10)
plt.rcParams['font.size'] = 11

print("Imports listos")

Imports listos


## Entorno: Sistema de Despacho de Ambulancias

Replicamos el entorno establecido en SAVIA+

In [2]:
# ============================================
# AMBIENTE: Sistema de Despacho de Ambulancias
# ============================================

class AmbulanceEnvDFE:
    """
    Entorno de despacho de ambulancias para DFE.
    Idéntico al usado en SAVIA+ para comparación.
    """
    
    def __init__(self, NA=2, NB=2, zones=3, classes=2):
        # Parámetros del sistema
        self.NA = NA  # Ambulancias ALS
        self.NB = NB  # Ambulancias BLS
        self.Z = zones
        self.C = classes
        
        # Espacio de estados: (sA, sB)
        self.states = [(sa, sb) for sa in range(NA+1) for sb in range(NB+1)]
        self.S = len(self.states)
        self.state_to_idx = {s: i for i, s in enumerate(self.states)}
        
        # Espacio de acciones
        self.actions = ['ALS', 'BLS', 'NOOP']
        self.A = len(self.actions)
        
        # Tasas de llegada por zona y clase (por hora)
        self.lambda_rates = {
            (1, 'A'): 0.3, (1, 'B'): 0.6,
            (2, 'A'): 0.2, (2, 'B'): 0.4,
            (3, 'A'): 0.15, (3, 'B'): 0.25
        }
        self.lambda_total = sum(self.lambda_rates.values())  # 1.9 por hora
        
        # Tasa de servicio (por ambulancia por hora)
        self.mu = 1.2
        
        # Tasa de uniformización
        self.Lambda = self.lambda_total + self.mu * (NA + NB)
        
        # Probabilidades de señal imperfecta
        self.omega_A = 0.85  # P(real A | señal A)
        self.omega_B = 0.10  # P(real A | señal B)
        
        # Utilidades
        self.U = {
            ('A', 'ALS'): 1.00, ('A', 'BLS'): 0.85,
            ('B', 'ALS'): 0.98, ('B', 'BLS'): 1.00
        }
        
        # Parámetros de cobertura por zona
        self.rho = {
            ('ALS', 1): 0.80, ('ALS', 2): 0.60, ('ALS', 3): 0.50,
            ('BLS', 1): 0.75, ('BLS', 2): 0.65, ('BLS', 3): 0.55
        }
        self.M_coverage = 5  # Saturación de cobertura
        
        print(f"Ambiente inicializado: {self.S} estados, {self.A} acciones")
        print(f"  Configuración: NA={NA}, NB={NB}")
        print(f"  λ_total = {self.lambda_total:.2f}/h, μ = {self.mu:.2f}/h")
        print(f"  Λ (uniformización) = {self.Lambda:.2f}/h")
    
    def coverage_function(self, sq, q_type, zone):
        """
        Función de cobertura
        """
        rho_qz = self.rho[(q_type, zone)]
        effective_units = min(sq, self.M_coverage)
        return 1 - (1 - rho_qz) ** effective_units
    
    def expected_utility(self, signal, q_type):
        """
        Utilidad esperada dado señal ω y tipo de ambulancia q.
        """
        if signal == 'A':
            return (self.omega_A * self.U[('A', q_type)] + 
                    (1 - self.omega_A) * self.U[('B', q_type)])
        else:  # signal == 'B'
            return (self.omega_B * self.U[('A', q_type)] + 
                    (1 - self.omega_B) * self.U[('B', q_type)])
    
    def reward(self, s, a, signal='A', zone=1):
        """
        Recompensa por despachar ambulancia tipo a para paciente con señal en zona.
        """
        sA, sB = s
        
        if a == 'NOOP':
            return 0.0
        
        q_type = a  # 'ALS' o 'BLS'
        sq = sA if q_type == 'ALS' else sB
        
        if sq == 0:  # No hay ambulancias disponibles
            return 0.0
        
        util = self.expected_utility(signal, q_type)
        coverage = self.coverage_function(sq, q_type, zone)
        
        return util * coverage
    
    def sample(self, s, a):
        """
        sampleo del siguiente estado y recompensa dado estado s y acción a.
        """
        sA, sB = s
        
        # Determinar tipo de evento según probabilidades de uniformización
        rand = np.random.random()
        cumulative = 0.0
        
        # Probabilidad de cada tipo de evento
        event_probs = {}
        
        # ARRIVALs por zona y clase
        for zone in range(1, self.Z + 1):
            for cls in ['A', 'B']:
                event_probs[('ARRIVAL', zone, cls)] = self.lambda_rates[(zone, cls)] / self.Lambda
        
        # Completaciones de servicio
        event_probs[('ALS_DONE',)] = self.mu * (self.NA - sA) / self.Lambda
        event_probs[('BLS_DONE',)] = self.mu * (self.NB - sB) / self.Lambda
        
        # NOOP 
        event_probs[('NOOP_EVENT',)] = 1 - sum(event_probs.values())
        
        # Samplear evento
        for event, prob in event_probs.items():
            cumulative += prob
            if rand < cumulative:
                break
        
        # Procesar evento
        s_next = s
        reward = 0.0
        
        if event[0] == 'ARRIVAL':
            zone, cls = event[1], event[2]
            signal = 'A' if cls == 'A' else 'B'
            
            # Calcular recompensa
            reward = self.reward(s, a, signal, zone)
            
            # Aplicar acción
            if a == 'ALS' and sA > 0:
                s_next = (sA - 1, sB)
            elif a == 'BLS' and sB > 0:
                s_next = (sA, sB - 1)
            # Si NOOP o no hay recursos, s_next = s
            
        elif event[0] == 'ALS_DONE':
            s_next = (min(sA + 1, self.NA), sB)
            
        elif event[0] == 'BLS_DONE':
            s_next = (sA, min(sB + 1, self.NB))
        
        # NOOP_EVENT no cambia estado ni da reward
        
        return s_next, reward
    
    def is_valid_action(self, s, a):
        """Verifica si la acción es válida en el estado s."""
        sA, sB = s
        if a == 'ALS':
            return sA > 0
        elif a == 'BLS':
            return sB > 0
        else:  # NOOP
            return True

# Crear instancia del ambiente
env = AmbulanceEnvDFE(NA=2, NB=2)

Ambiente inicializado: 9 estados, 3 acciones
  Configuración: NA=2, NB=2
  λ_total = 1.90/h, μ = 1.20/h
  Λ (uniformización) = 6.70/h


# EVI

In [None]:
# ============================================
# CONFIDENCE SETS PARA TRANSICIONES
# ============================================

def compute_B(n, delta, S, A):
    """
    Bound de confianza para ||p_hat - p||_1.
    
    B(n, δ) = sqrt(2*x(δ,n) / n)
    donde x(δ,n) = log(SA/δ) + (S-1)*log(e*(1 + n/(S-1)))
    
    Args:
        n: número de muestras
        delta: nivel de confianza
        
    Returns:
        B: bound tal que P(||p_hat - p||_1 <= B) >= 1 - δ
    """
    
    if n == 0:
        return np.inf
    
    x = np.log(S * A / delta) + (S - 1) * np.log(np.e * (1 + n / (S - 1)))
    B = np.sqrt(2 * x / n)
    
    return B

def build_confidence_sets(p_hat_dict, n_dict, delta, S, A):
    """
    Construye confidence sets P(s,a) para cada par estado-acción.
    
    P(s,a) = {p ∈ Σ_S : ||p - p_hat_{s,a}||_1 <= B(n_{s,a}, δ)}
    
    Args:
        p_hat_dict: diccionario {(s,a): p_hat} con estimaciones empíricas
        n_dict: diccionario {(s,a): n} con número de muestras
        delta: nivel de confianza
        
    Returns:
        P_sets: diccionario {(s,a): {'p_hat': array, 'radius': float}}
    """
    P_sets = {}
    
    for (s, a), p_hat in p_hat_dict.items():
        n = n_dict[(s, a)]
        radius = compute_B(n, delta, S, A)
        
        P_sets[(s, a)] = {
            'p_hat': p_hat,
            'radius': radius
        }
    
    return P_sets

print("Funciones de confidence sets definidas")

Funciones de confidence sets definidas


In [4]:
# ============================================
# EXTENDED VALUE ITERATION (EVI-SSP)
# ============================================

def optimistic_bellman(v, c, s, a, P_set, goal_state):
    """
    Operador de Bellman extendido (optimista).
    """
    if s == goal_state:
        return 0.0, None
    
    p_hat = P_set['p_hat']
    radius = P_set['radius']
    
    if radius >= 1.0:
        return c + np.dot(p_hat, v), p_hat
    
    # Optimización: mover masa hacia estados con menor v
    sorted_indices = np.argsort(v)
    
    p_opt = p_hat.copy()
    budget = radius
    
    for i_low in sorted_indices:
        if budget <= 0:
            break
        for i_high in reversed(sorted_indices):
            if i_low >= i_high or budget <= 0:
                break
            
            transfer = min(p_opt[i_high], budget / 2)
            p_opt[i_high] -= transfer
            p_opt[i_low] += transfer
            budget -= 2 * transfer
    
    p_opt = np.maximum(p_opt, 0)
    p_opt = p_opt / p_opt.sum()
    
    value = c + np.dot(p_opt, v)
    
    return value, p_opt


def EVI_SSP(env, cost_func, goal_state, P_sets, mu_VI=0.5, max_iter=3000, verbose=False):
    """
    Extended Value Iteration para SSP-MDP.
    """
    S = env.S
    A = env.A
    
    v = np.zeros(S)
    v_prev = np.zeros(S)
    
    p_tilde = {}
    pi_tilde = {}
    
    for iteration in range(max_iter):
        v_prev = v.copy()
        
        for s_idx, s in enumerate(env.states):
            if s_idx == goal_state:
                v[s_idx] = 0.0
                continue
            
            min_value = np.inf
            best_a_idx = None
            best_p = None
            
            for a_idx, a in enumerate(env.actions):
                if not env.is_valid_action(s, a):
                    continue
                
                c = cost_func(s, a)
                P_set = P_sets.get((s, a), {'p_hat': np.ones(S)/S, 'radius': 1.0})
                
                value, p_opt = optimistic_bellman(v_prev, c, s_idx, a_idx, P_set, goal_state)
                
                if value < min_value:
                    min_value = value
                    best_a_idx = a_idx
                    best_p = p_opt
            
            v[s_idx] = min_value
            if best_a_idx is not None:
                pi_tilde[s] = env.actions[best_a_idx]
                p_tilde[(s, env.actions[best_a_idx])] = best_p
        
        # Convergencia
        diff = np.max(np.abs(v - v_prev))
        
        if verbose and iteration % 100 == 0:
            print(f"      iter {iteration}: diff={diff:.6f}, max_v={np.max(v):.2f}")
        
        if diff <= mu_VI:
            if verbose:
                print(f"       Convergió en {iteration} iters")
            break
    
    if iteration >= max_iter - 1:
        print(f"     EVI-SSP NO convergió en {max_iter} iters (goal={goal_state})")
        print(f"         diff final = {diff:.6f} > mu_VI = {mu_VI}")
    
    return v, p_tilde, pi_tilde

print(" Extended Value Iteration (EVI-SSP) implementado")

 Extended Value Iteration (EVI-SSP) implementado


# Aplicación Algortimo 2

In [5]:
# ============================================
# ALGORITHM 2: DIAMETER ESTIMATION
# ============================================

def compute_N(delta, eta, S, A):
    """
    Número de muestras necesario para que B(N, δ) <= η.
    
    De la definición de B(n, δ):
    B(n, δ) = sqrt(2*x(δ,n) / n) <= η
    
    Resolvemos para n (aproximación conservadora).
    """
    # Bound conservador: x(δ, n) <= log(SA/δ) + (S-1)*log(e*(1 + n))
    # Queremos: sqrt(2*[log(SA/δ) + (S-1)*log(e*(1+n))] / n) <= η
    
    # Aproximación: usar bound superior
    # n >= C * (log(SA/δ) + S*log(n)) / η^2
    
    # Método iterativo
    n = 1
    for _ in range(100):  # Iteraciones de punto fijo
        x_n = np.log(S * A / delta) + (S - 1) * np.log(np.e * (1 + n / (S - 1)))
        n_new = int(np.ceil(2 * x_n / (eta ** 2)))
        
        if abs(n_new - n) < 10:
            break
        n = n_new
    
    return max(n, 1)


def estimate_diameter(env, delta, epsilon=1.0, verbose=True):
    """
    Algoritmo 2: Diameter Estimation.
    
    Estima D̂ tal que D ≤ D̂ ≤ 4D.
    
    Args:
        env: ambiente
        delta: nivel de confianza
        epsilon: precisión relativa (default=1 para bound 4D)
        verbose: imprimir progreso
        
    Returns:
        D_hat: estimación del diámetro
        samples_used: número total de samples
        iteration_data: información de cada iteración del doubling trick
    """
    if verbose:
        print("\n" + "="*60)
        print("ALGORITMO 2: DIAMETER ESTIMATION")
        print("="*60)
    
    W = 0.5
    v_infty = 1.0
    iteration = 0
    samples_total = 0
    iteration_data = []
    
    S = env.S
    A = env.A
    
    while v_infty > W:
        iteration += 1
        W = 2 * W
        eta = epsilon / (4 * W)
        
        if verbose:
            print(f"\nIteración {iteration}:")
            print(f"  W = {W:.3f}, η = {eta:.6f}")
        
        # Número de muestras por (s, a)
        N = compute_N(delta, eta, S, A)
        
        if verbose:
            print(f"  N (samples por (s,a)) = {N:,}")
        
        # Samplear N veces cada (s, a)
        p_hat_dict = {}
        n_dict = {}
        
        for s in env.states:
            for a in env.actions:
                if not env.is_valid_action(s, a):
                    continue
                
                # Contador de transiciones
                transitions = np.zeros(S)
                
                for _ in range(N):
                    s_next, _ = env.sample(s, a)
                    s_next_idx = env.state_to_idx[s_next]
                    transitions[s_next_idx] += 1
                
                # Estimación empírica
                p_hat = transitions / N
                p_hat_dict[(s, a)] = p_hat
                n_dict[(s, a)] = N
        
        samples_iter = S * A * N  # Solo contamos los (s,a) válidos aprox
        samples_total += samples_iter
        
        # Construir confidence sets
        P_sets = build_confidence_sets(p_hat_dict, n_dict, delta, S=env.S, A=env.A)
        
        # Resolver SSP para cada par (s, s')
        v_infty = 0.0
        
        if verbose:
            print(f"  Resolviendo {S} SSP-MDPs (uno por cada goal state)...")
        
        for s_goal_idx, s_goal in enumerate(env.states):
            # Definir función de costo: c(s, a) = 1 para todo (s, a)
            def cost_func(s, a):
                return 1.0
            
            # Resolver SSP con goal = s_goal
            mu_VI = min(1.0, epsilon / 2)  # Precisión de VI. CAMBIO DE PARÁMETRO POR PROBLEMA DE CONVERGENCIA
            # Verbose solo para los primeros 3 goals
            verbose_evi = verbose and (s_goal_idx < 3)
            if verbose_evi:
                print(f"    - Resolviendo SSP con goal={s_goal}")
            v_tilde, _, _ = EVI_SSP(env, cost_func, s_goal_idx, P_sets, mu_VI, verbose=verbose_evi)
            
            # Actualizar v_infty
            for s_idx, s in enumerate(env.states):
                if s_idx != s_goal_idx:
                    v_infty = max(v_infty, v_tilde[s_idx])
        
        if verbose:
            print(f"  ṽ_∞ = {v_infty:.6f}")
        
        iteration_data.append({
            'iteration': iteration,
            'W': W,
            'eta': eta,
            'N': N,
            'samples': samples_iter,
            'v_infty': v_infty
        })
    
    # Computar D_hat
    D_hat = (1 + eta * v_infty / 2) * v_infty
    
    if verbose:
        print(f"\n{'='*60}")
        print(f" Convergencia alcanzada en iteración {iteration}")
        print(f"  D̂ = {D_hat:.6f}")
        print(f"  Total samples: {samples_total:,}")
        print(f"{'='*60}\n")
    
    return D_hat, samples_total, iteration_data

print(" Algoritmo 2 (Diameter Estimation) implementado")

 Algoritmo 2 (Diameter Estimation) implementado


# Aplicación Algoritmo 3

In [6]:
# ============================================
# ALGORITMO 3: ZUREK-CHEN 
# ============================================

def value_iteration_discounted(env, gamma, p_hat_dict, r_tilde_dict, max_iter=10000, tol=1e-8):
    """
    Value Iteration para MDP descontado.
    
    Resuelve: V(s) = max_a [r(s,a) + \gamma * Σ_s' p(s'|s,a) V(s')]
    
    Args:
        env: ambiente
        gamma: discount factor
        p_hat_dict: transiciones empíricas {(s,a): p_array}
        r_tilde_dict: recompensas perturbadas {(s,a): float}
        max_iter: máximo número de iteraciones
        tol: tolerancia para convergencia
        
    Returns:
        V: función de valor óptima
        Q: función Q óptima
        pi: política óptima {s: a}
    """
    S = env.S
    A = env.A
    
    # Inicializar V = 0
    V = np.zeros(S)
    Q = np.zeros((S, A))
    
    for iteration in range(max_iter):
        V_prev = V.copy()
        
        # Actualizar Q-values
        for s_idx, s in enumerate(env.states):
            for a_idx, a in enumerate(env.actions):
                if not env.is_valid_action(s, a):
                    Q[s_idx, a_idx] = -np.inf
                    continue
                
                r = r_tilde_dict.get((s, a), 0.0)
                p = p_hat_dict.get((s, a), np.ones(S) / S)
                
                Q[s_idx, a_idx] = r + gamma * np.dot(p, V_prev)
        
        # Actualizar V
        V = np.max(Q, axis=1)
        
        # Convergencia
        if np.max(np.abs(V - V_prev)) < tol:
            break
    
    # Política óptima
    pi = {}
    for s_idx, s in enumerate(env.states):
        best_a_idx = np.argmax(Q[s_idx, :])
        pi[s] = env.actions[best_a_idx]
    
    return V, Q, pi


def zurek_chen_policy(env, epsilon, H_upper, delta, verbose=True):
    """
    Algoritmo 3: Zurek-Chen.
    
    Encuentra política ε-optimal usando upper bound H_upper en H.
        
    """
    if verbose:
        print("\n" + "="*60)
        print("ALGORITMO: ZUREK-CHEN ")
        print("="*60)
        print(f"Input: ε={epsilon}, H̄={H_upper:.3f}, δ={delta}")
    
    S = env.S
    A = env.A
    
    # Paso 1: Elegir discount factor
    gamma = 1 - epsilon / (12 * H_upper)
    
    if verbose:
        print(f"\nPaso 1: Discount factor")
        print(f"  γ = 1 - ε/(12H̄) = {gamma:.8f}")
    
    # Paso 2: Número de samples por (s, a)
    n = int(np.ceil(
        144 * H_upper / (epsilon ** 2) * np.log(12 * S * A / (delta * epsilon))
    ))
    
    if verbose:
        print(f"\nPaso 2: Uniform sampling")
        print(f"  n (samples por (s,a)) = {n:,}")
        print(f"  Total samples = {S * A * n:,}")
    
    # Paso 3: Samplear n veces cada (s, a)
    p_hat_dict = {}
    r_samples_dict = {}
    
    for s in env.states:
        for a in env.actions:
            if not env.is_valid_action(s, a):
                continue
            
            transitions = np.zeros(S)
            rewards = []
            
            for _ in range(n):
                s_next, r = env.sample(s, a)
                s_next_idx = env.state_to_idx[s_next]
                transitions[s_next_idx] += 1
                rewards.append(r)
            
            p_hat_dict[(s, a)] = transitions / n
            r_samples_dict[(s, a)] = np.mean(rewards)
    
    # Paso 4: Reward perturbation
    r_tilde_dict = {}
    np.random.seed(42)  # Para reproducibilidad
    
    for s in env.states:
        for a in env.actions:
            if not env.is_valid_action(s, a):
                continue
            
            r_base = r_samples_dict.get((s, a), 0.0)
            X_sa = np.random.uniform(0, epsilon / 72)
            r_tilde_dict[(s, a)] = r_base + X_sa
    
    if verbose:
        print(f"\nPaso 3-4: Sampling & perturbation completados")
    
    # Paso 5: Resolver MDP descontado empírico
    if verbose:
        print(f"\nPaso 5: Value Iteration (discounted MDP)")
    
    V, Q, pi = value_iteration_discounted(env, gamma, p_hat_dict, r_tilde_dict)
    
    samples_used = S * A * n
    
    metadata = {
        'gamma': gamma,
        'n': n,
        'V': V,
        'Q': Q
    }
    
    if verbose:
        print(f"\n{'='*60}")
        print(f" Política encontrada")
        print(f"  Samples usados: {samples_used:,}")
        print(f"{'='*60}\n")
    
    return pi, samples_used, metadata

print(" Algoritmo 3 (Zurek-Chen) implementado")

 Algoritmo 3 (Zurek-Chen) implementado


  """


# Aplicación Algoritmo Principal

In [7]:
# ============================================
# ALGORITMO 1: DFE 
# ============================================

def DFE(env, epsilon, delta, verbose=True):
    """
    Diameter Free Exploration (Algorithm 1, página 6).
    
    Algoritmo principal que NO requiere conocimiento previo del MDP.
    
    Args:
        env: ambiente
        epsilon: precisión deseada (ε-optimality)
        delta: nivel de confianza
        verbose: imprimir progreso
        
    Returns:
        pi: política ε-optimal
        D_hat: estimación del diámetro
        samples_total: número total de samples
        results: diccionario con información detallada
    """
    if verbose:
        print("\n" + "="*70)
        print(" "*15 + "DIAMETER FREE EXPLORATION (DFE)")
        print("="*70)
    
    start_time = time.time()
    
    # ============================================
    # PASO 1: ESTIMAR DIÁMETRO
    # ============================================
    if verbose:
        print("\n[PASO 1/2] Estimando el diámetro ...")
        print("-" * 70)
    
    D_hat, samples_diameter, diameter_data = estimate_diameter(
        env, 
        delta=delta/2,  # Union bound
        epsilon=1.0,
        verbose=verbose
    )
    
    # ============================================
    # PASO 2: ENCONTRAR POLÍTICA USANDO D̂
    # ============================================
    if verbose:
        print("\n[PASO 2/2] Encontrar una política óptima usando  D̂ como upper bound en H...")
        print("-" * 70)
    
    pi, samples_policy, policy_metadata = zurek_chen_policy(
        env,
        epsilon=epsilon,
        H_upper=D_hat,  # Usar D̂ como upper bound de H
        delta=delta/2,  # Union bound
        verbose=verbose
    )
    
    # ============================================
    # RESUMEN
    # ============================================
    samples_total = samples_diameter + samples_policy
    elapsed_time = time.time() - start_time
    
    if verbose:
        print("\n" + "="*70)
        print(" "*25 + "DFE RESULTADOS")
        print("="*70)
        print(f"  Diámetro estimado: D̂ = {D_hat:.6f}")
        print(f"  Samples (diameter estimation): {samples_diameter:,}")
        print(f"  Samples (policy identification): {samples_policy:,}")
        print(f"  TOTAL SAMPLES: {samples_total:,}")
        print(f"  Time elapsed: {elapsed_time:.2f} seconds")
        print("="*70)
    
    results = {
        'D_hat': D_hat,
        'samples_diameter': samples_diameter,
        'samples_policy': samples_policy,
        'samples_total': samples_total,
        'diameter_data': diameter_data,
        'policy_metadata': policy_metadata,
        'elapsed_time': elapsed_time,
        'epsilon': epsilon,
        'delta': delta
    }
    
    return pi, D_hat, samples_total, results

print(" Algoritmo DFE implementado")

 Algoritmo DFE implementado


## Ejecución DFE

In [8]:
# ============================================
# EJECUCIÓN DEL DFE
# ============================================

# Parámetros
epsilon = 0.5
delta = 0.3

print("Configuración del experimento:")
print(f"  Estados: {env.S}")
print(f"  Acciones: {env.A}")
print(f"  ε = {epsilon}")
print(f"  δ = {delta}")
print(f"\n{'='*70}")
print("Iniciando DFE...")
print(f"{'='*70}\n")

# Ejecutar DFE
pi_dfe, D_hat, samples_dfe, results_dfe = DFE(env, epsilon, delta, verbose=True)

Configuración del experimento:
  Estados: 9
  Acciones: 3
  ε = 0.5
  δ = 0.3

Iniciando DFE...


               DIAMETER FREE EXPLORATION (DFE)

[PASO 1/2] Estimando el diámetro ...
----------------------------------------------------------------------

ALGORITMO 2: DIAMETER ESTIMATION

Iteración 1:
  W = 1.000, η = 0.250000
  N (samples por (s,a)) = 1,804
  Resolviendo 9 SSP-MDPs (uno por cada goal state)...
    - Resolviendo SSP con goal=(0, 0)
      iter 0: diff=1.000000, max_v=1.00
       Convergió en 5 iters
    - Resolviendo SSP con goal=(0, 1)
      iter 0: diff=1.000000, max_v=1.00
       Convergió en 5 iters
    - Resolviendo SSP con goal=(0, 2)
      iter 0: diff=1.000000, max_v=1.00
       Convergió en 6 iters
  ṽ_∞ = 9.622526

Iteración 2:
  W = 2.000, η = 0.125000
  N (samples por (s,a)) = 8,866
  Resolviendo 9 SSP-MDPs (uno por cada goal state)...
    - Resolviendo SSP con goal=(0, 0)
      iter 0: diff=1.000000, max_v=1.00
       Convergió en 9 iters
    - Resolviendo S

## Visualización de la Política DFE



In [9]:
# ============================================
# VISUALIZACIÓN: POLÍTICA DFE OUTPUT
# ============================================

import numpy as np

# Crear matriz de política desde el diccionario
NA = env.NA
NB = env.NB

policy_grid_dfe = np.empty((NA + 1, NB + 1), dtype=object)

for s in env.states:
    sA, sB = s
    action = pi_dfe[s]
    
    # Abreviar acciones
    if action == 'ALS':
        symbol = 'ALS'
    elif action == 'BLS':
        symbol = 'BLS'
    else:
        symbol = 'NOOP'
    
    policy_grid_dfe[sA, sB] = symbol

# Imprimir tabla bonita
print("\n POLÍTICA ÓPTIMA DFE COMPLETA:")
print("=" * 70)
print("\n        Ambulancias BLS libres")
print("      ", end="")
for sB in range(NB + 1):
    print(f"  {sB}  ", end="")
print("\n    +" + "-----" * (NB + 1))

for sA in range(NA + 1):
    # Label "ALS" solo en la fila del medio
    if sA == NA // 2:
        print(f" ALS ", end="")
    else:
        print(f"     ", end="")
    
    print(f"{sA} |", end="")
    
    for sB in range(NB + 1):
        print(f" {policy_grid_dfe[sA, sB]:^4}", end="")
    print()

print("\n" + "=" * 70)


 POLÍTICA ÓPTIMA DFE COMPLETA:

        Ambulancias BLS libres
        0    1    2  
    +---------------
     0 | NOOP BLS  BLS 
 ALS 1 | ALS  ALS  BLS 
     2 | ALS  ALS  ALS 



## AVG REWARD del algoritmo


In [10]:
# ============================================
# AVERAGE REWARD DE POLÍTICA DFE
# ============================================

def simulate_policy(env, pi, num_steps=100000, verbose=True):
    """
    Simula una política en el ambiente para estimar el average reward.
    
    Args:
        env: ambiente
        pi: política {s: a}
        num_steps: número de pasos de simulación
        verbose: imprimir resultados
        
    Returns:
        metrics: diccionario con métricas de desempeño
    """
    # Estado inicial (recursos máximos)
    s = (env.NA, env.NB)
    
    # Contadores
    total_reward = 0.0
    event_counts = defaultdict(int)
    
    for step in range(num_steps):
        # Obtener acción de la política
        a = pi[s]
        
        # Samplear transición
        s_next, reward = env.sample(s, a)
        
        # Acumular reward
        total_reward += reward
        
        # Registrar tipo de evento (para diagnóstico)
        # Inferir evento basado en cambio de estado
        sA, sB = s
        sA_next, sB_next = s_next
        
        if sA_next < sA:  # ALS fue despachado
            event_counts['ALS_DISPATCH'] += 1
        elif sB_next < sB:  # BLS fue despachado
            event_counts['BLS_DISPATCH'] += 1
        elif sA_next > sA:  # ALS regresó
            event_counts['ALS_DONE'] += 1
        elif sB_next > sB:  # BLS regresó
            event_counts['BLS_DONE'] += 1
        else:  # Estado no cambió
            event_counts['NOOP_OR_ARRIVAL_LOST'] += 1
        
        s = s_next
    
    # Calcular métricas
    avg_reward_per_step = total_reward / num_steps
    avg_reward_per_hour = avg_reward_per_step * env.Lambda
    
    # Estimar tasa de arrivals atendidos
    arrivals_served = event_counts['ALS_DISPATCH'] + event_counts['BLS_DISPATCH']
    arrival_rate_observed = arrivals_served / num_steps * env.Lambda
    
    # Reward por ARRIVAL
    if arrivals_served > 0:
        avg_reward_per_arrival = total_reward / arrivals_served
    else:
        avg_reward_per_arrival = 0.0
    
    metrics = {
        'avg_reward_per_step': avg_reward_per_step,
        'avg_reward_per_hour': avg_reward_per_hour,
        'avg_reward_per_arrival': avg_reward_per_arrival,
        'total_reward': total_reward,
        'arrivals_served': arrivals_served,
        'arrival_rate_observed': arrival_rate_observed,
        'event_counts': dict(event_counts),
        'num_steps': num_steps
    }
    
    if verbose:
        print("\n" + "="*70)
        print(" "*20 + "Simulación")
        print("="*70)
        print(f"Pasos simulados: {num_steps:,}")
        print(f"\nAverage Reward:")
        print(f"  Por paso (uniformizado):  {avg_reward_per_step:.6f}")
        print(f"  Por hora:                {avg_reward_per_hour:.6f}")
    
    return metrics

# Evaluar política DFE
print("Evaluando política DFE mediante simulación...")
metrics_dfe = simulate_policy(env, pi_dfe, num_steps=100000, verbose=True)

Evaluando política DFE mediante simulación...

                    Simulación
Pasos simulados: 100,000

Average Reward:
  Por paso (uniformizado):  0.208234
  Por hora:                1.395171
