# Umbrella Sampling y WHAM: Visualización avanzada
Enfocamos el cuaderno en reproducir y extender la metodología del tutorial de OpenMM sobre umbrella sampling para disociación proteína-ligando, integrando los objetivos del Módulo 6 (energías libres y análisis de interacción) y añadiendo herramientas de visualización avanzada para inspeccionar ventanas, histogramas y el perfil de energía libre resultante tras aplicar WHAM.

## Contexto y objetivos
- Basado en el tutorial oficial de OpenMM sobre umbrella sampling (`https://openmm.github.io/openmm-cookbook/latest/notebooks/tutorials/umbrella_sampling.html`).
- Aplica el flujo descrito en el Módulo 6: preparar el complejo proteína-ligando, definir la CV como distancia centro de masa, ejecutar ventanas (5–25 Å) y analizar con WHAM.
- Objetivo: ofrecer tooling práctico para (1) ejecutar ventanas via `UmbrellaSamplingCalculator`, (2) generar histogramas y PMF con WHAM (vía `pymbar` si está disponible) y (3) visualizar métricas clave para evaluar convergencia y calidad de la muestra.

In [None]:
import json
import math
import importlib
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

try:
    from umbrella_sampling_calculator import create_umbrella_calculator
    UMBRELLA_AVAILABLE = True
except ImportError:
    UMBRELLA_AVAILABLE = False

try:
    import pymbar
    MBAR_AVAILABLE = True
except ImportError:
    MBAR_AVAILABLE = False

sns.set_context("talk")
sns.set_style("whitegrid")

In [None]:
@dataclass
class UmbrellaWindow:
    center: float
    force_constant: float
    cv_values: np.ndarray
    histogram_counts: np.ndarray
    histogram_edges: np.ndarray

    @property
    def mean_cv(self) -> float:
        return float(np.mean(self.cv_values))

    @property
    def std_cv(self) -> float:
        return float(np.std(self.cv_values))


def _synthetic_window(center: float, force_constant: float, n_samples: int, noise: float = 0.6) -> UmbrellaWindow:
    """Genera datos artificiales con solapamiento controlado para visualización."""
    sigma = max(0.2, math.sqrt(1.0 / force_constant))
    samples = np.random.normal(loc=center, scale=sigma + noise, size=n_samples)
    hist, edges = np.histogram(samples, bins=80, density=True)
    return UmbrellaWindow(center=center, force_constant=force_constant, cv_values=samples, histogram_counts=hist, histogram_edges=edges)


def load_umbrella_dataset(results_dir: Path) -> Tuple[List[UmbrellaWindow], Dict]:
    """Carga resultados generados por UmbrellaSamplingCalculator o produce dataset sintético."""
    windows: List[UmbrellaWindow] = []
    metadata: Dict = {}

    if results_dir.exists():
        metadata_path = results_dir / "umbrella_metadata.json"
        if metadata_path.exists():
            metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
        else:
            metadata = {}

        # Buscar archivos de histogramas guardados
        for hist_file in sorted(results_dir.glob("cv_histogram_center_*.dat")):
            center = float(hist_file.stem.split("_")[-1])
            hist_data = np.loadtxt(hist_file)
            counts = hist_data[:, 1]
            edges = np.concatenate([hist_data[:, 0], [hist_data[-1, 0] + (hist_data[1, 0] - hist_data[0, 0])]])

            series_file = results_dir / f"cv_timeseries_center_{center:.2f}.dat"
            if series_file.exists():
                cv_values = np.loadtxt(series_file)
            else:
                cv_values = np.repeat(center, counts.size)

            force_constant = metadata.get("force_constant", 10.0)
            windows.append(UmbrellaWindow(center=center, force_constant=force_constant, cv_values=cv_values, histogram_counts=counts, histogram_edges=edges))

    if not windows:
        # Generar dataset sintético similar al workflow 5-25 Å con 40 ventanas
        centers = np.linspace(5.0, 25.0, 40)
        metadata = {
            "window_centers": centers.tolist(),
            "force_constant": 12.0,
            "temperature": 300.0,
            "simulation_time_ps": 2000.0,
            "synthetic": True
        }
        for center in centers:
            windows.append(_synthetic_window(center=center, force_constant=metadata["force_constant"], n_samples=4000))

    return windows, metadata

In [None]:
async def run_umbrella_workflow(structure_file: Path,
                                 protein_atoms: List[int],
                                 ligand_atoms: List[int],
                                 centers: np.ndarray,
                                 force_constant: float,
                                 simulation_time_ps: float,
                                 output_dir: Path) -> Tuple[List[UmbrellaWindow], Dict]:
    """Ejecuta umbrella sampling basado en UmbrellaSamplingCalculator si está disponible."""
    if not UMBRELLA_AVAILABLE:
        raise RuntimeError("UmbrellaSamplingCalculator no está disponible en el entorno actual")

    calculator = create_umbrella_calculator({
        "platform": "CPU",
        "temperature": 300.0,
        "force_field": "amber19-all.xml",
        "implicit_solvent": False
    })

    cv_force = {
        "type": "distance",
        "atoms": [0, 1],
        "params": {}
    }

    # Emplear CustomCentroidBondForce cuando se proporcionen grupos
    if protein_atoms and ligand_atoms:
        cv_force = {
            "type": "distance",
            "atoms": [protein_atoms[0], ligand_atoms[0]],
            "params": {}
        }

    results = await calculator.run_full_umbrella_sampling(
        structure_file=str(structure_file),
        cv_config=cv_force,
        window_centers=centers.tolist(),
        force_constant=force_constant,
        simulation_time_ps=simulation_time_ps,
        temperature=300.0,
        output_dir=str(output_dir)
    )

    windows, _ = load_umbrella_dataset(output_dir)
    metadata = results["metadata"]
    metadata.update({"force_constant": force_constant, "simulation_time_ps": simulation_time_ps})
    return windows, metadata

In [None]:
def compute_pmf(windows: List[UmbrellaWindow], temperature: float = 300.0) -> pd.DataFrame:
    """Calcula el perfil de energía libre con WHAM/MBAR cuando está disponible."""
    k_B = 0.0019872041  # kcal/mol/K
    beta = 1.0 / (k_B * temperature)

    bins = np.linspace(min(w.center for w in windows) - 0.5, max(w.center for w in windows) + 0.5, 300)
    bin_centers = 0.5 * (bins[:-1] + bins[1:])

    if MBAR_AVAILABLE:
        # Construir u_kn (energías reducidas) y N_k (samples por ventana)
        N_k = np.array([w.cv_values.size for w in windows])
        K = len(windows)
        u_kn = np.zeros((K, bin_centers.size))
        for k, window in enumerate(windows):
            kappa = window.force_constant
            cv_values = window.cv_values
            for n, cv in enumerate(bin_centers):
                u_kn[k, n] = beta * 0.5 * kappa * (cv - window.center) ** 2
        mbar = pymbar.MBAR(u_kn, N_k)
        f_i, df_i = mbar.computePMF(u_kn, bin_centers, nbins=bin_centers.size)
        pmf = f_i - f_i.min()
        uncertainty = df_i
    else:
        # Estimación log-probabilidad agregando histogramas normalizados
        combined_counts = np.zeros(bin_centers.size)
        for window in windows:
            hist, _ = np.histogram(window.cv_values, bins=bins, density=False)
            combined_counts += hist
        combined_counts[combined_counts == 0] = 1e-10
        pmf = -np.log(combined_counts)
        pmf -= pmf.min()
        uncertainty = np.full_like(pmf, fill_value=np.std(pmf) * 0.1)

    return pd.DataFrame({
        "cv": bin_centers,
        "pmf": pmf,
        "uncertainty": uncertainty
    })

In [None]:
def plot_umbrella_diagnostics(windows: List[UmbrellaWindow], pmf_df: pd.DataFrame, title_suffix: str = "") -> None:
    """Genera paneles de diagnóstico inspirados en el tutorial del OpenMM Cookbook."""
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))

    # (1) Mapas de calor de histogramas (solapamiento)
    ax = axes[0, 0]
    heatmap_data = []
    heatmap_centers = []
    bins = None
    for window in windows:
        hist = window.histogram_counts
        edges = window.histogram_edges
        centers = 0.5 * (edges[:-1] + edges[1:])
        if bins is None or len(centers) > len(bins):
            bins = centers
        heatmap_data.append(np.interp(bins, centers, hist, left=0.0, right=0.0))
        heatmap_centers.append(window.center)
    sns.heatmap(np.array(heatmap_data), cmap="viridis", ax=ax, cbar_kws={"label": "P(CV)"})
    ax.set_ylabel("Ventana")
    ax.set_xlabel("CV (Å)")
    ax.set_title(f"Solapamiento de histogramas {title_suffix}")
    ax.set_yticks(np.linspace(0.5, len(windows) - 0.5, 5))
    ax.set_yticklabels([f"{w.center:.1f}" for w in windows[::max(1, len(windows)//5)]])

    # (2) Medidas de convergencia por ventana
    ax = axes[0, 1]
    means = [w.mean_cv for w in windows]
    stds = [w.std_cv for w in windows]
    centers = [w.center for w in windows]
    ax.errorbar(centers, means, yerr=stds, fmt="o", ecolor="gray", capsize=4)
    ax.plot(centers, centers, linestyle="--", color="black", alpha=0.5)
    ax.set_xlabel("Centro programado (Å)")
    ax.set_ylabel("Mean CV ± std (Å)")
    ax.set_title("Seguimiento de la CV medida vs target")

    # (3) PMF
    ax = axes[1, 0]
    ax.plot(pmf_df["cv"], pmf_df["pmf"], color="#1f77b4", linewidth=2)
    ax.fill_between(pmf_df["cv"], pmf_df["pmf"] - pmf_df["uncertainty"], pmf_df["pmf"] + pmf_df["uncertainty"], alpha=0.3)
    ax.set_xlabel("CV (Å)")
    ax.set_ylabel("ΔG (kcal/mol)")
    ax.set_title("Perfil de energía libre (PMF)")

    # (4) Distribución acumulada por ventana
    ax = axes[1, 1]
    for window in windows[::max(1, len(windows)//8)]:
        sorted_cv = np.sort(window.cv_values)
        cumulative = np.linspace(0, 1, sorted_cv.size)
        ax.plot(sorted_cv, cumulative, label=f"ξ₀={window.center:.1f} Å")
    ax.set_xlabel("CV (Å)")
    ax.set_ylabel("CDF")
    ax.set_title("Distribuciones acumuladas selectas")
    ax.legend(loc="lower right", fontsize=9)

    plt.tight_layout()
    plt.show()

In [None]:
results_dir = Path("umbrella_results")
windows, metadata = load_umbrella_dataset(results_dir)
pmf_df = compute_pmf(windows, temperature=metadata.get("temperature", 300.0))

print("Ventanas cargadas:", len(windows))
print("Metadatos:", metadata)
plot_umbrella_diagnostics(windows, pmf_df, title_suffix="(dataset sintético)" if metadata.get("synthetic") else "")

### Interpretación y próximos pasos
- **Energías libres de unión (Módulo 6.1):** el PMF obtenido permite extraer ΔG de disociación identificando los mínimos y la barrera entre estados enlazado/no enlazado.
- **Pérdidas en modelos de proteínas (Módulo 6.2):** las curvas de PMF ayudan a validar si los estados que el modelo ML considera relevantes coinciden con mínimos energéticos físicos; se puede correlacionar con distogram loss o RMSD loss.
- **Identificación de hotspots (Módulo 6.3):** el panel de hist solapados facilita localizar ventanas donde residuos críticos restringen el ligando; combinar con cálculos per-residue (MM/PBSA) y análisis de aguas estructuradas.
- **Integración experimental:** si se generan datos reales con `run_umbrella_workflow`, repetir el análisis y exportar `pmf_df` para introducirlo en pipelines de diseño racional o validación experimental.