In [None]:
from typing import Any, Dict
import pandas as pd
import requests
import time
from dataclasses import dataclass

import numpy as np
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional, Literal, Dict, Any, List, Union, Number    

#Motor API FRED -> pd.DataFrame sucio
#Entrada: api_key, endpoint y diccionario 
#Out: DataFrame sucio

class DataLoad:
    def __init__(self, api_key="02ea49012ba021ea89f1110c48de7380", timeout=20, retries=3, backoff=1.7):
        self.api_key = api_key                    # <- corregido (antes apy_key)
        self.timeout = timeout
        self.retries = retries
        self.backoff = backoff                    # <- guardado (antes faltaba)
        self.url = "https://api.stlouisfed.org/fred/series/observations"

    def DicSerie(self, *, id: str, freq_origin: str, freq_objetivo: str,
                 agg: str = "last", transform: str | None = None, fill: str | None = None) -> dict:
        return {
            "id": id,
            "freq_origin": freq_origin,     # informativo
            "freq_objetivo": freq_objetivo, # ej. "M", "W-FRI"
            "agg": agg,                     # "last","mean",...
            "transform": transform,         # "pct_change","yoy_pct", None
            "fill": fill,                   # "ffill" o None
        }

    def request_observations(self, series_id: str, **extra_params) -> Dict[str, Any]:
        params = {
            "series_id": series_id,
            "api_key": self.api_key,        # <- corregido
            "file_type": "json",
        }
        # agrega sólo los que no son None (ej. observation_start, observation_end)
        params.update({k: v for k, v in extra_params.items() if v is not None})

        last_err = None
        for attempt in range(1, self.retries + 1):
            try:
                resp = requests.get(self.url, params=params, timeout=self.timeout)
                if 200 <= resp.status_code < 300:
                    return resp.json()
                last_err = f"HTTP {resp.status_code}: {resp.text[:200]}"
            except Exception as e:
                last_err = str(e)
            time.sleep(self.backoff * attempt)

        raise RuntimeError(f"Fallo API FRED [{series_id}]: {last_err}")

    def _to_frame(self, payload: dict) -> pd.DataFrame:
        # observations es una lista de dicts con {'date': 'YYYY-MM-DD', 'value': '...'}
        obs = payload.get("observations", [])
        df = pd.DataFrame(obs)
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"])
        df["value"] = pd.to_numeric(df["value"], errors="coerce")
        return df.set_index("date")[["value"]].sort_index()


    def get_one(self, cfg_or_id, *, observation_start: str | None = None, observation_end: str | None = None) -> pd.DataFrame:
        # Acepta dict (con 'id') o un string directamente
        if isinstance(cfg_or_id, dict):
            series_id = cfg_or_id["id"]
        else:
            series_id = str(cfg_or_id)

        raw = self.request_observations(
                series_id,
                observation_start=observation_start,
                observation_end=observation_end,
            )
        df = self._to_frame(raw)  # <-- devuelve df con índice 'date' y col 'value'
            # NADA de fill/resample/transform aquí
        return df.rename(columns={"value": series_id})




@dataclass
class TransformStep:

    fn: str
    arg: dict[str, any] = field(default_factory=dict)

@dataclass
class SeriesSpec:
    fred_id: str
    alias: Optional[str] = None
    units: Optional[str] = None          # normalize_units (pp, %, bp, rebase, log)
    freq_objetivo: str = "M"
    agg: Literal["last","mean"] = "last"
    fill: Optional[Literal["ffill","bfill","both"]] = "ffill"
    steps: List[TransformStep] = field(default_factory=list)  # pipeline ordenado

@dataclass
class LayerSpec:
    name: str
    series: List[SeriesSpec]
    target_freq: str = "M"
    resample_agg: Literal["last","mean"] = "last"
    how_merge: Literal["outer","inner","asof"] = "outer"
    default_fill: Optional[Literal["ffill","bfill","both"]] = "ffill"
    derived: List[TransformStep] = field(default_factory=list)
    post: List[TransformStep] = field(default_factory=list)

#Entry: DataFrame dirty
#Exit: DataFrame fill, resample (if is necesary), derivate, z-score, mean

Number = Union[int, float, np.number]


class DataTransform:
    """
    Conjunto de transformaciones/limpieza para `pd.Series` (time series).
    Todas las funciones preservan el índice de la serie y son null-safe.
    """

    # 🔴 LIMPIEZA -------------------------------------------------------------

    def maybe_fill(self, s: pd.Series, fill: Optional[Literal["ffill", "bfill", "both"]] = None) -> pd.Series:
        """
        Aplica forward/backward fill si se indica.
        fill:
          - None   : no hace nada
          - "ffill": forward fill
          - "bfill": backward fill
          - "both" : primero ffill y luego bfill
        """
        if fill is None:
            return s
        if fill == "ffill":
            return s.ffill()
        if fill == "bfill":
            return s.bfill()
        if fill == "both":
            return s.ffill().bfill()
        raise ValueError(f"Fill no soportado: {fill}")

    def dropna(self, s: pd.Series) -> pd.Series:
        """Elimina valores nulos."""
        return s.dropna()

    def clip(self, s: pd.Series, clip_lower: Optional[int] = None, clip_upper: Optional[int] = None) -> pd.Series:
        """Recorta valores fuera de rango."""
        return s.clip(lower=clip_lower, upper=clip_upper)

    def winsorize(
        self,
        s: pd.Series,
        percentile_low: float = 0.01,
        percentile_up: float = 0.99,
    ) -> pd.Series:
        """
        Limita outliers por percentiles (winsorización).
        """
        if s.empty:
            return s
        lo = s.quantile(percentile_low)
        hi = s.quantile(percentile_up)
        return s.clip(lower=lo, upper=hi)

    def normalize_units(self, s: pd.Series, units: str) -> pd.Series:
        """
        Normaliza unidades:
        - percent_from_decimal : 0.052 -> 5.2
        - decimal_from_percent : 5.2 -> 0.052
        - pp_from_decimal      : 0.052 -> 5.2
        - pp_from_percent      : 5.2 -> 5.2
        - bp_from_percent      : 5.2 -> 520
        - bp_from_decimal      : 0.052 -> 520
        - percent_from_bp      : 520 -> 5.2
        - decimal_from_bp      : 520 -> 0.052
        - scale:<factor>       : multiplica por factor (p.ej. "scale:1e-6")
        - rebase:<base>[@YYYY-MM-DD] : rebasea índices
        - log                  : log natural (<=0 → NaN)
        - level/identity/none  : identidad
        """
        if s is None or s.empty:
            return s

        key = (units or "").strip().lower()

        # Tasas y puntos
        if key == "percent_from_decimal":
            return s * 100.0
        if key == "decimal_from_percent":
            return s / 100.0
        if key == "pp_from_decimal":
            return s * 100.0
        if key == "pp_from_percent":
            return s
        if key == "bp_from_percent":
            return s * 100.0
        if key == "bp_from_decimal":
            return s * 10000.0
        if key == "percent_from_bp":
            return s / 100.0
        if key == "decimal_from_bp":
            return s / 10000.0

        # Escalado genérico
        if key.startswith("scale:"):
            try:
                factor = float(key.split("scale:", 1)[1])
            except Exception:
                raise ValueError(f"scale:<factor> inválido en '{units}'")
            return s * factor

        # Rebase de índices
        if key.startswith("rebase:"):
            try:
                payload = key.split("rebase:", 1)[1]
                if "@" in payload:
                    base_str, date_str = payload.split("@", 1)
                    base_val = float(base_str)
                    ref_date = pd.to_datetime(date_str)
                    ref_val = s.loc[ref_date]
                else:
                    base_val = float(payload)
                    ref_val = s.dropna().iloc[0]
            except KeyError:
                raise ValueError(f"No existe dato en la fecha indicada en '{units}'")
            except Exception:
                raise ValueError("Usa 'rebase:<base>' o 'rebase:<base>@YYYY-MM-DD'")
            return (s / ref_val) * base_val

        # Log
        if key == "log":
            s_pos = s.where(s > 0)
            return np.log(s_pos)

        if key in {"level", "identity", "none", ""}:
            return s

        raise ValueError(f"Modo de normalize_units no soportado: '{units}'")

    def log(self, s: pd.Series) -> pd.Series:
        """Log natural (<=0 → NaN)."""
        return np.log(s.where(s > 0))

    # 🟠 VARIACIONES ----------------------------------------------------------

    def diff(self, s: pd.Series, periods: int = 1) -> pd.Series:
        """Diferencia simple: s - s.shift(periods)."""
        return s.diff(periods)

    def pct_change(self, s: pd.Series, periods: int = 1) -> pd.Series:
        """Cambio porcentual n-periodos (devuelve en decimales)."""
        return s.pct_change(periods=periods)

    def mom(self, s: pd.Series) -> pd.Series:
        """Atajo de pct_change(1)."""
        return s.pct_change(1)

    def yoy(self, s: pd.Series) -> pd.Series:
        """Interanual (12 períodos)."""
        return s.pct_change(12)

    def qoq_annualized(self, s: pd.Series, compounded: bool = True) -> pd.Series:
        """
        Trimestral anualizado:
          - compounded=True  → ((1 + pct_change(3))**4 - 1)
          - compounded=False → pct_change(3) * 4   (aprox lineal)
        """
        qoq = s.pct_change(3)
        if compounded:
            return (1.0 + qoq) ** 4 - 1.0
        return qoq * 4.0

    def rolling_pct_change(self, s: pd.Series, window: int) -> pd.Series:
        """Variación acumulada en ventana móvil: s / s.shift(window) - 1."""
        return s / s.shift(window) - 1.0

    # 🟡 TENDENCIA / SUAVIZADO ------------------------------------------------

    def rolling_mean(self, s: pd.Series, window: int, min_periods: Optional[int] = None) -> pd.Series:
        return s.rolling(window, min_periods=min_periods or window).mean()

    def rolling_std(self, s: pd.Series, window: int, min_periods: Optional[int] = None) -> pd.Series:
        return s.rolling(window, min_periods=min_periods or window).std()

    def ema(self, s: pd.Series, span: int, adjust: bool = False) -> pd.Series:
        """Media móvil exponencial con ewm(span)."""
        return s.ewm(span=span, adjust=adjust).mean()

    def slope(
        self,
        s: pd.Series,
        window: int,
        method: Literal["diff", "ols"] = "ols",
        min_periods: Optional[int] = None,
        as_annualized: bool = False,
        periods_per_year: Optional[int] = None,
    ) -> pd.Series:
        """
        Pendiente local en ventana:
          - method="diff": s - s.shift(window) (rápido, no escala temporal)
          - method="ols" : beta de una regresión y ~ x en cada ventana (x=0..window-1)
        as_annualized=True con periods_per_year → multiplica la pendiente.
        """
        mp = min_periods or window
        if method == "diff":
            out = s - s.shift(window)
        elif method == "ols":
            # beta = cov(x,y)/var(x) con x fijo 0..n-1
            x = np.arange(window, dtype=float)
            x = (x - x.mean())  # centrar para estabilidad
            var_x = (x**2).sum()
            def _beta(y: np.ndarray) -> float:
                if np.isnan(y).any():
                    return np.nan
                y = y - y.mean()
                cov_xy = float((x * y).sum())
                return cov_xy / var_x if var_x != 0 else np.nan
            out = s.rolling(window, min_periods=mp).apply(_beta, raw=True)
        else:
            raise ValueError("method debe ser 'diff' u 'ols'")

        if as_annualized and periods_per_year:
            out = out * periods_per_year
        return out

    # 🟢 ESTANDARIZACIÓN ------------------------------------------------------

    def zscore(self, s: pd.Series, window: Optional[int] = None) -> pd.Series:
        """
        Z-score:
          - window=None → (s - mean) / std global
          - window=k    → rolling z-score
        """
        if window is None:
            return (s - s.mean()) / s.std()
        roll = s.rolling(window)
        return (s - roll.mean()) / roll.std()

    def percentile_rank(self, s: pd.Series, window: int) -> pd.Series:
        """
        Ranking percentil 0–1 en ventana (empírico).
        Usa rank(pct=True) sobre cada ventana.
        """
        def _pct_rank(x: pd.Series) -> float:
            return x.rank(pct=True).iloc[-1]
        return s.rolling(window, min_periods=window).apply(lambda x: pd.Series(x).rank(pct=True).iloc[-1], raw=False)

    def rebase_index(self, s: pd.Series, base: float = 100.0, ref: Optional[pd.Timestamp] = None) -> pd.Series:
        """
        Rebase sencillo a `base`. Si `ref` es None usa el primer valor no nulo.
        """
        if s.empty:
            return s
        ref_val = s.loc[ref] if ref is not None else s.dropna().iloc[0]
        return (s / ref_val) * base

    # 🔵 COMPOSICIÓN INTRA-SERIE ---------------------------------------------

    def delta_vs_min(self, s: pd.Series, window: int) -> pd.Series:
        """Diferencia vs mínimo rolling."""
        return s - s.rolling(window).min()

    def delta_vs_max(self, s: pd.Series, window: int) -> pd.Series:
        """Diferencia vs máximo rolling."""
        return s - s.rolling(window).max()

    def cumulative_sum(self, s: pd.Series) -> pd.Series:
        """Acumulado (útil en flujos)."""
        return s.cumsum()

    def annualize_from_periods(
        self,
        s_returns: pd.Series,
        periods_per_year: int,
        compounded: bool = True,
    ) -> pd.Series:
        """
        Lleva una tasa/retorno de 'periods_per_year' a anual:
          - compounded=True  → (1+r)**periods_per_year - 1
          - compounded=False → r * periods_per_year
        """
        if compounded:
            return (1.0 + s_returns) ** periods_per_year - 1.0
        return s_returns * periods_per_year

    # 🟣 ESPECÍFICAS MACRO ----------------------------------------------------

    def ma_weeks_to_month(self, s_weekly_ma: pd.Series) -> pd.Series:
        """
        Convierte una media móvil semanal (p.ej., MA4) a frecuencia mensual
        tomando el último valor disponible de cada mes.
        Requiere que el índice sea datetime y esté en semanal.
        """
        return s_weekly_ma.resample("M").last()

    def annualized_growth(self, s: pd.Series, window: int, periods_per_year: int, compounded: bool = True) -> pd.Series:
        """
        Crecimiento anualizado genérico desde una ventana de 'window' períodos:
          - compounded=True  → (s / s.shift(window))**(periods_per_year/window) - 1
          - compounded=False → (s/s.shift(window)-1) * (periods_per_year/window)
        """
        ratio = s / s.shift(window)
        if compounded:
            return ratio ** (periods_per_year / window) - 1.0
        return (ratio - 1.0) * (periods_per_year / window)

    def flag_threshold(
        self,
        s: pd.Series,
        threshold:int,
        op: Literal["ge", "gt", "le", "lt", "eq"] = "ge",
    ) -> pd.Series:
        """
        Devuelve 0/1 si se cumple condición con el umbral.
        op: ge (>=), gt (>), le (<=), lt (<), eq (==)
        """
        ops = {
            "ge": s >= threshold,
            "gt": s > threshold,
            "le": s <= threshold,
            "lt": s < threshold,
            "eq": s == threshold,
        }
        if op not in ops:
            raise ValueError("op debe ser uno de {'ge','gt','le','lt','eq'}")
        return ops[op].astype(int)

    def flag_persistence(
        self,
        cond_series: pd.Series,
        n_periods: int,
    ) -> pd.Series:
        """
        Marca 1 cuando una condición (serie booleana/0-1) se cumple N períodos seguidos.
        """
        x = cond_series.astype(int)
        return (x.rolling(n_periods).sum() >= n_periods).astype(int)

        
class LayerBuilder:
    def __init__(self, loader:DataLoad, T:DataTransform):
        self.loader = loader
        self.T = T
        self.step_registry = {}


    def build(self, spec:LayerSpec) -> pd.DataFrame:
        frames = []
        for s in spec.series:
            df_raw = self.loader.get_one(s.fred_id)
            ser = df_raw[s.fred_id]
            if s.units:ser = self.T.normalize_units(ser, s.units)
            ser = self._to_target_freq(ser, s.freq_objetivo, s.agg)

            



 



        

    



    

    

        

        




  return s.pct_change()


               SP500
date                
2025-08-08  0.024277
2025-08-15  0.009445
2025-08-22  0.002653
2025-08-29 -0.001028
2025-09-05  0.000000
            FEDFUNDS
date                
2025-04-30      4.33
2025-05-31      4.33
2025-06-30      4.33
2025-07-31      4.33
2025-08-31      4.33


  return getattr(s.resample(freq), agg)()
