# CMB - Espectro de Potencias
---
- **Máster de Física Avanzada de la Universidad de Valencia**
- **Cosmología**

## 0. Librerías y Configuración del Sistema

### 0.1. Librerías
---
**healpy**
- Es el wrapper de Python (Cython) para HEALPix (Hierarchical Equal Area iso-Latitude Pixelization), el estándar para muestrear funciones en la esfera usado en CMB (Planck/WMAP).
- No ofrece soporte para Windows (alternativas --> astropy_healpix)
- Presenta problemas de compatiblidad con versiones de Numpy y Scipy
- Permite el Cálculo de espectros angulares $C_l$ y derivados de forma extremadametne rápida y eficiente en el consumo de recursos

In [8]:
import os
import time
from datetime import datetime
import requests

import matplotlib.pyplot as plt
import numpy as np
import scipy
import astropy.units as u
from astropy.io import fits
from scipy.signal import find_peaks
try:
    import torch
    _HAS_TORCH = True
except Exception:
    _HAS_TORCH = False

try:
    import healpy as hp
    _HAS_HEALPY = True
except:
    from joblib import Parallel, delayed
    from scipy.special import sph_harm
    from astropy_healpix import HEALPix
    _HAS_HEALPY = False


### 0.2 Chequeo del Sistema
---


In [9]:
print('\nTorch/CUDA Info:\n-----------------------')
if _HAS_TORCH:
    cuda = torch.cuda.is_available()
    print('Torch Version:', torch.__version__)
    print('Torch CUDA available:', cuda)
    if cuda:
        print("torch_cuda_version:",torch.version.cuda)
        ngpu = torch.cuda.device_count()
        for i in range(ngpu):
            print("Index:", i,"| Name:", torch.cuda.get_device_name(i),
                "| Capability:", "".join(str(x) for x in torch.cuda.get_device_capability(i)),
                "| Total Memory (GB):", "{:.2f}".format(torch.cuda.get_device_properties(i).total_memory/1024**3))
print('\nModule Version:\n-----------------------')
if _HAS_HEALPY:
    for module, name in zip([hp, np],["healpy","numpy"]):
        print(name,'version:', module.__version__)
else:
    for module, name in zip([scipy, np],["healpy","numpy"]):
        print(name,'version:', module.__version__)


Torch/CUDA Info:
-----------------------
Torch Version: 2.3.1
Torch CUDA available: True
torch_cuda_version: 12.1
Index: 0 | Name: NVIDIA GeForce GTX 1080 Ti | Capability: 61 | Total Memory (GB): 11.00

Module Version:
-----------------------
healpy version: 1.15.2
numpy version: 1.26.4


## 1. Descarga de los archivos de Datos del CMB
---
Varias Opciones de Descarga en Repositorios públicos:

**1. IRSA - NASA/IPACInfraredScienceArchive**
- Contiene Mapas completos de CMB de alta resolución con datos procedentes del telescopio Plank
- Formato de descarga: HEALPix FITS
- Mapas disponibles en el archivo:
  - I_STOKES: Mapa de intensidad (De Fluctuaciones de la Temperatura)
  - Q_STOKES y U_STOKES: Mapas de Polarización
  - TMASK y PMASK: Máscaras de la galaxia para la Temperatura y la Polarización
- Pipelines Disponibles:
  - COMMANDER, NILC, SEVEM, and SMICA (Recomendado)

### 1.1. Parámetros - Descarga de los Mapas del CMB

In [10]:
DOWNLOAD_OPTION = 'IRSA'
IRSA_PIPELINE   = 'smica'   # options: 'commander', 'nilc', 'sevem', 'smica'


### 1.2. Descarga Mapas IRSA (https://irsa.ipac.caltech.edu/)

In [11]:
if DOWNLOAD_OPTION == 'IRSA':
    pipeline = 'smica'   # options: 'commander', 'nilc', 'sevem', 'smica'
    web_url    = "https://irsa.ipac.caltech.edu"
    release_url = "/data/Planck/release_3/all-sky-maps/maps/component-maps/cmb/"
    file_url   = "".join(['COM_CMB_IQU-', IRSA_PIPELINE, '_2048_R3.00_full.fits'])
    fits_url   = web_url + os.path.join(web_url, release_url)
    fits_file  = file_url
    print(fits_url)
    def download_fits(url, file):
        if os.path.exists(file):
            print(f"The file '{file}' is already downloaded.")
        else:
            print(f"Downloading '{url}' ...")
            resp = requests.get(url, stream=True)
            resp.raise_for_status()
            with open(file, "wb") as f:
                for chunk in resp.iter_content(chunk_size=8192):
                    f.write(chunk)
            print(f"Download finished: {file}")

    # Download FITS File with CMB Data
    download_fits(fits_url, fits_file)
    mapas = {'I_STOKES': 0,
             'Q_STOKES': 1,
             'U_STOKES': 2,
             'TMASK': 3,
         'PMASK': 4}

https://irsa.ipac.caltech.edu/data/Planck/release_3/all-sky-maps/maps/component-maps/cmb/
The file 'COM_CMB_IQU-smica_2048_R3.00_full.fits' is already downloaded.


### 1.3. Anáisis de la Cabecera del archivo FITS y Carga de los Mapas

In [12]:
hdul = fits.open(fits_file)
data = hdul[1].data
maps = {}
for mapa, code in mapas.items():
    maps[mapa] =  hp.read_map(fits_file, field=code)
I_map = maps['I_STOKES']
nside = 1024
print(f"Mapa cargado: nside={nside}, npix={len(I_map)}, rango de temperaturas: {I_map.min():.2f} a {I_map.max():.2f} μK")
print("\n=== Primary Header ===")
print(repr(hdul[0].header))

NameError: name 'hp' is not defined

## 2. Visualización de Lo Mapas

### 2.0. Funciones Auxiliares

In [None]:
# ------------------------------------------------------------
#  Q/U → E/B en mapa
# ------------------------------------------------------------
def qu_to_eb_maps(Q, U, lmax=None):
    """
    Convierte Q,U → mapas E,B usando armónicos de spin.
    Devuelve E,B (en mismo NSIDE que Q/U).
    """
    nside = hp.get_nside(Q)
    if lmax is None:
        lmax = 3*nside - 1
    almE, almB = hp.map2alm_spin([Q, U], spin=2, lmax=lmax)
    E = hp.alm2map(almE, nside=nside, lmax=lmax)
    B = hp.alm2map(almB, nside=nside, lmax=lmax)
    return E, B

# ------------------------------------------------------------
#  Visualizaciones moleweide y versiones enmascaradas
# ------------------------------------------------------------
def show_map(m, title="", unit="", coord='G', minmax=None, cmap='coolwarm'):
    """
    Visualiza un mapa HEALPix en proyección Mollweide.
    coord: 'G' (Galácticas), 'C' (Ecuatoriales), 'E' (Eclípticas)
    minmax: (vmin, vmax) o None
    """
    vmin, vmax = (None, None)
    if minmax is not None:
        vmin, vmax = minmax
    hp.mollview(m, coord=coord, title=title, unit=unit, min=vmin, max=vmax, cmap=cmap)
    hp.graticule()

### 2.1. I_STOKES. Mapa Intensidad (de Fluctuaciones de Temperatura CMB)

In [None]:
I = maps['I_STOKES']*1e6
I_min = maps['I_STOKES'].min()
I_max = maps['I_STOKES'].max()
show_map(I, title="CMB Temperature (I)", unit="μK", coord='G', minmax=(-400,400))


### 2.2. Q_STOKES/U_STOKES. Mapas de Polarización

In [None]:
Q = maps['Q_STOKES']*1e6
Q_min = maps['Q_STOKES'].min()
Q_max = maps['Q_STOKES'].max()
show_map(Q, title="CMB Polarization (Q)", unit="μK", coord='G', minmax=(-100,100))

U = maps['U_STOKES']*1e6
U_min = maps['U_STOKES'].min()
U_max = maps['U_STOKES'].max()
show_map(U, title="CMB Polarization (U)", unit="μK", coord='G', minmax=(-100,100))

### 2.3. E/B Polarization

In [None]:
E, B = qu_to_eb_maps(Q, U)
show_map(E, title="E-mode (from Q/U)", unit="K", coord='G', minmax=(-100,100))
show_map(B, title="B-mode (from Q/U)", unit="K", coord='G', minmax=(-100,100))

### 2.3. TMASK/PMASK. Máscaras de los Mapas de Intensidad y Polarización

In [None]:
f_sky = np.mean(maps['TMASK'])  # Fracción de cielo no enmascarada
print(f"Fracción de cielo No enmascarada (Temperatura): {f_sky:.3f}")
show_map(maps['TMASK'], title="TMASK", unit="", coord='G', cmap='viridis')
f_sky = np.mean(maps['PMASK'])  # Fracción de cielo no enmascarada
print(f"Fracción de cielo No enmascarada (Polarización): {f_sky:.3f}")
show_map(maps['PMASK'], title="PMASK", unit="", coord='G', cmap='viridis')

## 3 Espectro de Potencias del CMB
---

**1. Descomposición en Armónicos Esféricos:**

El mapa se expande en términos de armónicos esféricos $ Y_{\ell m}(\hat{n}) $:
$$\Delta T(\hat{n}) = \sum_{\ell=0}^{\infty} \sum_{m=-\ell}^{\ell} a_{\ell m} Y_{\ell m}(\hat{n})$$
Los coeficientes $ a_{\ell m} $ se obtienen mediante la transformada inversa (proyección sobre los armónicos esféricos):
$$a_{\ell m} = \int \Delta T(\hat{n}) Y_{\ell m}^*(\hat{n}) \, d\Omega$$
donde $ d\Omega = \sin\theta \, d\theta \, d\phi $ es el elemento de área sólida, y $ * $ denota el complejo conjugado. En la práctica, para mapas discretos (como los en formato HEALPix), esta integral se aproxima como una suma sobre los píxeles:
$$a_{\ell m} \approx \sum_{i=1}^{N_{\text{pix}}} \Delta T_i \, Y_{\ell m}^*(\theta_i, \phi_i) \, \Delta \Omega_i$$
En HEALPix, todos los píxeles tienen el mismo área $ \Delta \Omega_i = 4\pi / N_{\text{pix}} $, donde $ N_{\text{pix}} = 12 \times n_{\text{side}}^2 $ y $ n_{\text{side}} $ es el parámetro de resolución.

**2. Cálculo del Espectro de Potencia $ C_\ell $:**
El espectro de potencia es el promedio de los cuadrados de los coeficientes $ a_{\ell m} $ sobre los $ 2\ell + 1 $ valores de $ m $:
$$C_\ell = \frac{1}{2\ell + 1} \sum_{m=-\ell}^{\ell} |a_{\ell m}|^2$$
Esto asume isotropía estadística (el CMB es homogéneo en promedio). En la práctica, se corrige por efectos como el ruido, la resolución del haz (beam) y máscaras (regiones excluidas del cielo).


**3. Consideraciones Adicionales:**
- **Límite en $ \ell $:** El $ \ell_{\max} $ depende de la resolución del mapa; típicamente $ \ell_{\max} \approx 3 \times n_{\text{side}} $.
- **Normalización**: Normalmente se multiplica por factores como $ \ell(\ell+1)/2\pi $ para graficar el espectro en forma de $ D_\ell $, que resalta las oscilaciones acústicas.
- **Ruido y Correcciones**: En datos reales, se usa estimadores como el de máxima verosimilitud o pseudo-$ C_\ell $ para manejar ruido y cobertura incompleta del cielo.

### 3.1. Cálcula los coeficientes $C_l$ utilizando librería healpy
---
- Función **anafast** de la librería healpy

In [None]:
def compute_cmb_spectrum(
    I_map,# tmask, pmask
    lmax=None,
    map_units="uK",          # "uK" o "K"
    ):
    """
    Calcula C_l y muestra D_l = l(l+1)C_l/(2pi), con binning y detección de picos.
    Parámetros:
    - I_map: mapa HEALPix (np.ndarray), p.ej. leído con hp.read_map(...)
    - lmax: multipolo máximo; por defecto, 3*nside-1
    - map_units: "uK" si el mapa está en micro-Kelvin, "K" si en Kelvin
    """
    #unidades
    unit_scale = 1.0 if map_units.lower() == "uk" else (1e6)**2

    # Inferir nside y lmax por defecto
    nside = hp.get_nside(I_map)
    if lmax is None:
        lmax = 3 * nside - 1

    # Cálculo de C_l (Pseudo-Cl si hay máscara; para máscaras severas considera pipelines tipo MASTER)
    cls = hp.anafast(I_map, lmax=lmax)
    print(cls.shape, unit_scale)
    cls_scale = cls * unit_scale  # devuelve Cl en unidades de K^2 por consistencia física

    # Cálculo de D_l
    ell = np.arange(len(cls))
    Dl = ell * (ell + 1) * cls / (2 * np.pi) * unit_scale

    results = {
        "lmax": lmax,
        "nside": nside,
        "unit_scale": unit_scale,
        "ell": ell,
        "Cl": cls_scale,
        "Dl": Dl
    }
    return results


In [None]:
I_mask = hp.remove_dipole(hp.remove_monopole(I)) * maps['TMASK']
nside  = hp.get_nside(I)
lmax   = 3*nside-1

print('Tamaño de los datos:', I.shape)
print('Parámetro NSIDE:', nside)
print('Multipolo Máximo:', lmax)

results = compute_cmb_spectrum(I_mask,
                           lmax=None,                 # por defecto 3*nside-1
                          )
print('Número de Multipolos calculado:', len(results['Cl']))

### 3.2. Promedia los valores $D_l$ en bins uniformes

In [None]:
def bin_spectrum(results, dL=20):
    """Promedia Dl en bins uniformes de tamaño dL (ignora ell<2)."""
    ell = results["ell"]
    Dl =results["Dl"]
    m = (ell >= 2)
    eb, Db = [], []
    L = np.arange(2, ell[m].max()+1, dL)
    for a in L:
        sel = (ell >= a) & (ell < a + dL)
        if np.any(sel):
            eb.append(int(np.round(ell[sel].mean())))
            Db.append(Dl[sel].mean())

    results["ell_binned"] = np.asarray(eb)
    results["Dl_binned"] = np.asarray(Db)
    results["dL"] = dL
    return results

In [None]:
results = bin_spectrum(results, dL=20)
print('Tamaño de array D_L Binned:', results["Dl_binned"].shape, len(results["ell_binned"]))


### 3.3. Encuentra los Picos Acústicos

In [None]:
def quadratic_subpeak(x, y):
    """
    Ajuste cuadrático local y = ax^2 + bx + c para refinar el máximo.
    Devuelve x_peak (sub-entero), y_peak. Requiere len>=3.
    """
    A = np.vstack([x**2, x, np.ones_like(x)]).T
    a, b, c = np.linalg.lstsq(A, y, rcond=None)[0]
    if a >= 0:  # parábola abierta hacia arriba → devuelve máximo discreto
        i = np.argmax(y)
        return float(x[i]), float(y[i])
    x_peak = -b/(2*a)
    y_peak = a*x_peak**2 + b*x_peak + c
    return float(x_peak), float(y_peak)

def find_peak_in_window(ell, Dl, Lmin, Lmax, refine_width=8):
    """
    Busca el pico global en [Lmin,Lmax] y lo refina con ajuste cuadrático
    usando una ventanita ±refine_width en multipolos.
    """
    ell = np.asarray(ell, dtype=float).ravel()
    Dl  = np.asarray(Dl,  dtype=float).ravel()
    sel = (ell >= Lmin) & (ell <= Lmax)
    if not np.any(sel):
        return None, None
    i0 = np.argmax(Dl[sel])
    idxs = np.where(sel)[0]
    i_peak = idxs[i0]
    # Ventana local para refinamiento
    i1 = max(0, i_peak - refine_width)
    i2 = min(len(ell), i_peak + refine_width + 1)
    xloc = ell[i1:i2].astype(float)
    yloc = Dl[i1:i2].astype(float)
    x_peak, y_peak = quadratic_subpeak(xloc, yloc)
    return int(np.round(x_peak)), int(np.round(y_peak))


In [None]:
results['peaks'] = []
range_peaks = [(150, 300), (400, 650), (700, 900)]
for rang in range_peaks:
    results['peaks'].append(find_peak_in_window(results['ell_binned'], results['Dl_binned'], rang[0], rang[1]))
print('Número total de picos Encontrados:', len(results['peaks']))
for ix, (peak, value) in enumerate(results['peaks']):
    print(ix,'- Peak in Multipole:', peak, ' (Value = ',"{:.2f}".format(value),')')


### 3.4. Visualización del Espectro

In [None]:
def plot_cmb_spectrum(
    results,
    annotate_peaks=True,
    show=True,
    lmax_plot=None
):
    """
    Parámetros:
    - apply_gaussian_smooth: suavizado extra en l-space (sobre D_l binned)
    - smooth_sigma_l: sigma del kernel gaussiano en ell
    - annotate_peaks: anotar picos detectados
    - use_torch_smoothing: si True y hay torch, suaviza con conv1d gaussiana en GPU/CPU
    """

    # ---- Detección de picos en el D_l binned/suavizado ----
    ell = results['ell']
    Dl = results['Dl']
    ell_binned = results['ell_binned']
    Dl_binned = results['Dl_binned']
    ell_peaks = [ell[peak[0]] for peak in results['peaks']]
    Dl_peaks = [Dl[peak[1]] for peak in results['peaks']]
    print(ell_peaks, Dl_peaks)
    if lmax_plot is None:
        lmax_plot = len(ell)

    # ---- Gráfica ----
    plt.figure(figsize=(9, 5.5))
    # Curva de alta resolución (gris claro)
    plt.plot(ell, Dl, lw=1, alpha=0.4, label=r"$D_\ell$ (no bin)")
    # Curva binned
    plt.plot(ell_binned, Dl_binned, lw=1.6, label=fr"$D_\ell$ binned ($\Delta\ell={results['dL']}$)")
    # Picos
    if annotate_peaks and len(ell_peaks) > 0:
     #  plt.scatter(ell_peaks, Dl_peaks, s=35, zorder=5, label="Picos detectados")
        # Anotar algunos picos destacados (primeros 4–5 típicos)
        order = np.argsort(Dl_peaks)[::-1]
        for k in order[:5]:
            lp, Dp = int(ell_peaks[k]), Dl_peaks[k]
            plt.axvline(ell_peaks[k], linestyle="dotted", color='black', linewidth=1.0)

            plt.annotate(fr"$\ell\!\approx\!{lp}$",
                         xy=(lp, Dp),
                         xytext=(lp+25, Dp*1.05),
                         arrowprops=dict(arrowstyle="->", lw=1, alpha=0.8),
                         fontsize=10)

    plt.xlim(2, lmax_plot)
    plt.xlabel(r"Multipolo $\ell$", fontsize=13)
    plt.ylabel(r"$D_\ell=\dfrac{\ell(\ell+1)}{2\pi}C_\ell\;[\mu\mathrm{K}^2]$", fontsize=13)
    plt.title("Espectro de Potencias del CMB (picos acústicos)", fontsize=14)
    plt.grid(alpha=0.25)
    plt.legend()
    plt.tight_layout()
    if show:
        plt.show()


In [None]:
plot_out = plot_cmb_spectrum(
    results,
    annotate_peaks=True,
    lmax_plot = 1800
)

## 4. Análisis del Espectro - Observables Geométricos
---

Los observables geométricos principales que se pueden extraer del espectro incluyen:

- **Horizonte de sonido** $ r_s(z_*) $: La distancia comóvil que viaja una onda acústica hasta la recombinación.
$$ r_s(z_*) = \int_0^{t_*} c_s dt / a(t) $$
donde: $ z_* \approx 1090 $ es el redshift de recombinación.
$$ c_s = \frac{c}{\sqrt{3(1 + R)}} $$ es la velocidad del sonido en el plasma, $$ R = \frac{3\rho_b}{4\rho_\gamma} $$ es la ratio bariones-fotones, y $ a(t) $ el factor de escala.
- **Distancia angular al último scattering** $ D_A $: Mide la curvatura y expansión del universo.
  
$$ D_A(z_*) = \frac{1}{1+z_*} \int_0^{z_*} \frac{c \, dz}{H(z)} $$
$$ H(z)=H_0\sqrt{\Omega_m(1+z)^3+\Omega_r(1+z)^4+\Omega_k(1+z)^2+\Omega_\Lambda}$$
en un universo plano (para universos curvos, incluye factor de curvatura).
- **Escala acústica angular** $ \theta_A $ (o $ \theta_* $): Representa el ángulo subtendido por el horizonte acústico en la superficie de último scattering (z ≈ 1090). Es un observable robusto que mide la ratio entre el horizonte de sonido y la distancia angular.
$$\theta_A = \frac{r_s(z_*)}{D_A(z_*)}$$

- **Multipolo acústico** $ \ell_A $: Relacionado con $ \theta_A $ por $ \ell_A = \pi / \theta_A $, y corresponde aproximadamente a la posición del primer pico o al espaciado promedio de los picos.
$$\ell_A = \pi \frac{D_A(z_*)}{r_s(z_*)}$$
En modelos ΛCDM estándar, $ \ell_A \approx 301 $ (de mediciones de Planck).
- **Chequeo de desfase:** recoge desplazamientos por bariones y neutrinos. Utilizado como diagnóstico de la solución. Su valor debería estar en el rango $[0.25, 0.3]$

$$\ell_1 \;\approx\; \ell_A\,\bigl(1 - \phi_1\bigr) \quad\Rightarrow\quad \widehat{\phi}_1 \approx 1-\frac{\ell_1}{\ell_A}$$

In [None]:
def acoustic_scale_from_peaks(results):
    ell, Dl = results['ell'], results['Dl']
    # Espaciados y ℓ_A
    d12 = results['peaks'][1][0] - results['peaks'][0][0]
    d23 = results['peaks'][2][0] - results['peaks'][1][0]
    lA  = 0.5*(d12 + d23)         # estimador simple
    theta_star = np.pi / lA       # en radianes

    # Desfase del 1er pico (diagnóstico)
    phi1_hat = 1.0 - (results['peaks'][0][0] / lA)

    results["d12"] = d12
    results["d23"] = d23
    results["lA"]= lA
    results["theta_star_rad"]= theta_star
    results["theta_star_arcmin"]= theta_star * (180/np.pi) * 60.0
    results["phi1_hat"] = phi1_hat
    results["DA_over_rs"] = lA/np.pi    # D_A / r_s

    return results

# =========================
# EJEMPLO DE USO:
# Suponiendo que ya tienes ell, Dl (por ejemplo, de hp.anafast)
results = acoustic_scale_from_peaks(results)
print(f"Posición del primer pico:    l_1 ≈ {results['peaks'][0][0]}")
print(f"Multipolo Acústico:       Δℓ≈ℓ_A ≈ {results['lA']:.1f} ")
print(f"Escala Acúsica Angular:      θ_* ≈ {results['theta_star_arcmin']:.2f} arcmin")
print(f"Dist. Ang US/Hor. Sonoro:D_A/r_s ≈ {results['DA_over_rs']:.3f}")
print(f"Chequeo de Desfase:           φ1 ≈ {results['phi1_hat']:.3f}")
if (results['phi1_hat']<0.25 or results['phi1_hat']>0.30):
    print('ATENCIÓN: Chequeo de desfase fuera de rango [0.25, 0.3]. Comprobar: beam, máscara, ruido, smoothing excesivo...')