In [None]:
from pathlib import Path
from typing import Tuple, Optional, Iterable, Dict, List, Any
from collections import defaultdict
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg") 
import matplotlib.pyplot as plt
plt.ioff()
import matplotlib.patches as patches
from matplotlib.lines import Line2D
import gc

BASE = Path(r"C:\Nuevos_excel")
CSV_PATH = BASE / "Todo_10000.csv"
SEP = ';'
OUT_DIR = BASE / "mapas_interes_recurrencia_nv400"
OUT_DIR.mkdir(parents=True, exist_ok=True)

DATECOL = 'fecha'
XCOL, YCOL = 'X', 'Y'

KERNEL_X, KERNEL_Y = 400, 400     # tamaño del kernel 
MIN_PTS_KERNEL = 30               # mínimo de puntos en el kernel
START_YEAR = 2019
CHUNK = 1_000_000

SAVE_DPI = 170
PIL_KW = {"quality": 85, "optimize": True, "progressive": True}

STREAMING_SAVE = False          
KERNEL_LIMIT = None               
MAX_BG_PTS = 250_000

plt.rcParams['agg.path.chunksize'] = 20000
plt.rcParams['path.simplify'] = True
plt.rcParams['path.simplify_threshold'] = 0.5

# Litología (predominio ≥60% reactivo vs caliza)
REACTIVE_LITS = {'intrusivo', 'skarn'}
CALIZA_LITS   = {'caliza', 'calizas', 'marmol', 'mármol', 'caliza/marmol', 'caliza/mármol'}
LITO_MIN_FRAC = 0.60

#  Umbrales humedad y Fe3
NMDI_PCTL = 75   # percentil para humedad baja (NMDI ≤ P75)
FE3_PCTL  = 80   # percentil para Fe3 alto   (Fe3 ≥ P80)


FE2_DROP_MIN = 0.02  
FE2_DROP_PCT = None   

FE3_RISE_MIN = 0.05   
FE3_RISE_PCT = None   

USE_TEMP = False         
TEMP_WINDOW_DAYS = 10     
TEMP_DELTA_MIN = 1     
RING_MARGIN_CELLS = 10    


FIXED_COLOR_BY_COUNT = {
    1: '#ff6ec7',  # rosado
    2: '#00cfff',  # celeste
    3: '#ffa600',  # naranjo
    4: '#7e3ff2',  # morado
    5: '#00d084',  # verde
    6: '#ff4d4d',  # rojo
    7: '#8dd3c7',
    8: '#fb8072',
    9: '#80b1d3',
    10: '#fdb462',
}
def color_for_count(n: int):
    if n in FIXED_COLOR_BY_COUNT:
        return FIXED_COLOR_BY_COUNT[n]
    cmap = plt.get_cmap('tab20')  # fallback estable
    return cmap((n % 20) / 20.0)


def BOOL(x): return str(x).strip().lower() in {'true','t','1','yes','y','si','sí'}

def infer_grid_step(vals: np.ndarray) -> float:
    u = np.sort(np.unique(vals[~pd.isna(vals)]))
    if u.size <= 1: return 1.0
    d = np.diff(u); d = d[(d > 0) & np.isfinite(d)]
    return float(np.median(d)) if d.size else 1.0

def build_grid_indices(df_pts: pd.DataFrame) -> Tuple[float, float]:
    dx = infer_grid_step(df_pts[XCOL].values)
    dy = infer_grid_step(df_pts[YCOL].values)
    x0, y0 = df_pts[XCOL].min(), df_pts[YCOL].min()
    df_pts['iX'] = np.round((df_pts[XCOL] - x0)/dx).astype('int32')
    df_pts['iY'] = np.round((df_pts[YCOL] - y0)/dy).astype('int32')
    return dx, dy

def kernel_bbox(ix0, iy0, kx, ky):
    hx, hy = (kx - 1)//2, (ky - 1)//2
    return (ix0 - hx, ix0 + hx, iy0 - hy, iy0 + hy)

def vecinos_de(ix0, iy0, kx, ky, cell2pids: Dict[tuple, List[int]]):
    x0, x1, y0, y1 = kernel_bbox(ix0, iy0, kx, ky)
    out = []
    for ix in range(x0, x1 + 1):
        for iy in range(y0, y1 + 1):
            out.extend(cell2pids.get((ix, iy), []))
    return list(set(out))

def compute_temp_any_l8l9(df_: pd.DataFrame) -> pd.Series:
    cols = []
    if 'Temp_C_L9' in df_.columns:
        m9 = df_.get('inside_L9', True)
        t9 = pd.to_numeric(df_['Temp_C_L9'], errors='coerce')
        v9 = np.where((m9 == True) & (t9 > -80.0), t9, np.nan)
        cols.append(pd.Series(v9, index=df_.index, dtype='float32'))
    if 'Temp_C_L8' in df_.columns:
        m8 = df_.get('inside_L8', True)
        t8 = pd.to_numeric(df_['Temp_C_L8'], errors='coerce')
        v8 = np.where((m8 == True) & (t8 > -80.0), t8, np.nan)
        cols.append(pd.Series(v8, index=df_.index, dtype='float32'))
    if not cols:
        return pd.Series(np.nan, index=df_.index, dtype='float32')
    out = pd.concat(cols, axis=1).mean(axis=1, skipna=True).astype('float32')
    out[out < -80.0] = np.nan
    return out

def nearest_mean_temp(df_region: pd.DataFrame, target_date: pd.Timestamp, max_days: int) -> float:
    g = (df_region[['fecha_norm','Temp_any']]
         .dropna()
         .assign(fecha_norm=lambda d: pd.to_datetime(d['fecha_norm']).dt.normalize())
         .groupby('fecha_norm')['Temp_any'].mean())
    if g.empty:
        return np.nan
    dates_avail = pd.DatetimeIndex(g.index)
    deltas = dates_avail - pd.Timestamp(target_date)  # TimedeltaIndex
    td = pd.Timedelta(days=max_days)
    mask = (deltas >= -td) & (deltas <= td)
    if not mask.any():
        return np.nan
    filtered_dates = dates_avail[mask]
    filtered_deltas_ns_abs = np.abs(deltas[mask].asi8)  # abs en ns
    j = int(np.argmin(filtered_deltas_ns_abs))
    closest_date = filtered_dates[j]
    return float(g.loc[closest_date])

NEEDED_COLS = [
    'punto_id', DATECOL, XCOL, YCOL,
    'inside_S2',
    'B2_S2','B3_S2','B4_S2','B8A_S2','B11_S2','B12_S2',
    'litologia',
    'inside_L8','inside_L9','Temp_C_L8','Temp_C_L9'
]
DTYPES = {
    XCOL:'float32', YCOL:'float32',
    'B2_S2':'float32','B3_S2':'float32','B4_S2':'float32',
    'B8A_S2':'float32','B11_S2':'float32','B12_S2':'float32',
    'Temp_C_L8':'float32','Temp_C_L9':'float32'
}

rows = []
need = set(NEEDED_COLS)

it = pd.read_csv(
    CSV_PATH, sep=SEP,
    usecols=lambda c: c.strip() in need,
    chunksize=CHUNK, engine="python", on_bad_lines="skip",
    parse_dates=[DATECOL]
)
for i, ch in enumerate(it, 1):
    ch.columns = ch.columns.str.strip()

    for c, dt in DTYPES.items():
        if c in ch.columns:
            ch[c] = pd.to_numeric(ch[c], errors='coerce').astype(dt)

    for b in ['inside_S2','inside_L8','inside_L9']:
        if b in ch.columns and ch[b].dtype != bool:
            ch[b] = ch[b].map(BOOL)

    ch['punto_id'] = pd.to_numeric(ch['punto_id'], errors='coerce')
    ch['punto_id'] = np.rint(ch['punto_id']).astype('Int64')

    if DATECOL not in ch.columns:
        print(f"[WARN] Chunk {i} sin columna {DATECOL}; se omite.")
        continue
    ch['fecha_norm'] = pd.to_datetime(ch[DATECOL], errors='coerce').dt.normalize()

    if {'inside_S2','B2_S2','B3_S2','B4_S2','B8A_S2','B11_S2','B12_S2'}.issubset(ch.columns):
        mS2 = (ch['inside_S2'] == True)
        B2  = ch['B2_S2'].replace(0, np.nan)
        B3  = ch['B3_S2'].replace(0, np.nan)
        B4  = ch['B4_S2'].replace(0, np.nan)
        B8A = ch['B8A_S2'].replace(0, np.nan)
        B11 = ch['B11_S2'].replace(0, np.nan)
        B12 = ch['B12_S2'].replace(0, np.nan)

        ch['Ferric_minor_S2'] = np.where(mS2, B4 / B2, np.nan).astype('float32')                  # Fe3
        ch['Ferrous_S2']      = np.where(mS2, (B3 + B11) / (B4 + B8A), np.nan).astype('float32')  # Fe2
        denom = (B8A + (B11 - B12))
        numer = (B8A - (B11 - B12))
        nmdi = np.where(mS2 & np.isfinite(denom) & (np.abs(denom) > 1e-9), numer / denom, np.nan)
        ch['NMDI_S2'] = pd.Series(nmdi, index=ch.index, dtype='float32')
    else:
        ch['Ferric_minor_S2'] = np.nan; ch['Ferrous_S2'] = np.nan; ch['NMDI_S2'] = np.nan

    ch['Temp_any'] = compute_temp_any_l8l9(ch)

    if 'litologia' in ch.columns:
        ch['litologia'] = ch['litologia'].astype('category')

    keep = ['punto_id','fecha_norm', XCOL, YCOL,
            'Ferric_minor_S2','Ferrous_S2','NMDI_S2',
            'inside_S2','litologia',
            'inside_L8','inside_L9','Temp_C_L8','Temp_C_L9','Temp_any']
    rows.append(ch[keep].copy())
    print(f"[LOAD] chunk {i} ({len(ch)})")
    if i % 5 == 0:
        gc.collect()

if not rows:
    raise SystemExit("No se pudo leer ningún chunk válido.")
df = pd.concat(rows, ignore_index=True)
del rows; gc.collect()

df = df[df['fecha_norm'] >= pd.Timestamp(START_YEAR, 1, 1)]
if df.empty:
    raise SystemExit("No hay datos después de START_YEAR.")

df_valid = df[(df['inside_S2']==True) & df['NMDI_S2'].notna() & df['Ferric_minor_S2'].notna()].copy()

def q(arr, p, fb):
    return float(np.nanpercentile(arr, p)) if arr.size else fb

NMDI_Pth = q(df_valid['NMDI_S2'].to_numpy('float64'), NMDI_PCTL, 0.91)  # fallback típico ~P75
FE3_Pth  = q(df_valid['Ferric_minor_S2'].to_numpy('float64'), FE3_PCTL, 1.20)

(OUT_DIR / "_umbrales_y_parametros.txt").write_text(
    "Umbrales (inside_S2==True)\n"
    f"NMDI P{NMDI_PCTL}: {NMDI_Pth:.4f}  |  Fe³⁺ P{FE3_PCTL}: {FE3_Pth:.4f}\n"
    "Parámetros Cond. B (Fe2↓ y Fe3↑):\n"
    f"  FE2_DROP_MIN: {FE2_DROP_MIN} | FE2_DROP_PCT: {FE2_DROP_PCT}\n"
    f"  FE3_RISE_MIN: {FE3_RISE_MIN} | FE3_RISE_PCT: {FE3_RISE_PCT}\n"
    "Filtro térmico:\n"
    f"  USE_TEMP: {USE_TEMP} | TEMP_WINDOW_DAYS: {TEMP_WINDOW_DAYS} | "
    f"TEMP_DELTA_MIN: {TEMP_DELTA_MIN} | RING_MARGIN_CELLS: {RING_MARGIN_CELLS}\n",
    encoding="utf-8"
)

df_pts = (df[['punto_id', XCOL, YCOL]].dropna()
          .drop_duplicates('punto_id').reset_index(drop=True))
dx, dy = build_grid_indices(df_pts)
df = df.merge(df_pts[['punto_id','iX','iY']], on='punto_id', how='left')

# Un centro por celda (evita duplicar kernels)
df_centros = (df[['iX','iY','punto_id']].dropna()
              .drop_duplicates(subset=['iX','iY'], keep='first'))
centros = (df_centros.astype({'punto_id':'int64'})
           .sort_values(['iX','iY']).values.tolist())

cell2pids = df_pts.groupby(['iX','iY'])['punto_id'].apply(list).to_dict()

MAP_DIR = OUT_DIR / "mapas_interes_por_fecha"
MAP_DIR.mkdir(parents=True, exist_ok=True)

lit_map = (
    df[['punto_id', 'litologia']].dropna()
      .assign(litologia=lambda d: d['litologia'].astype(str).str.strip().str.lower())
      .groupby('punto_id')['litologia']
      .agg(lambda s: s.mode().iloc[0] if not s.empty else np.nan)
      .to_dict()
)
def kernel_is_reactive_vs_caliza(pids, min_frac=LITO_MIN_FRAC):
    if not pids:
        return False
    lits = [lit_map.get(int(pid)) for pid in pids]
    lits_known = [l for l in lits if isinstance(l, str) and l and l != 'nan']
    if not lits_known:
        return False
    n_reac = sum(l in REACTIVE_LITS for l in lits_known)
    n_calc = sum(l in CALIZA_LITS   for l in lits_known)
    den = n_reac + n_calc
    if den == 0:
        return False
    return (n_reac / den) >= min_frac

candidates_by_kernel: Dict[tuple, Dict[pd.Timestamp, tuple]] = defaultdict(dict)

for idx, (ix0, iy0, pid0) in enumerate(centros, 1):
    if KERNEL_LIMIT is not None and idx > int(KERNEL_LIMIT):
        break

    # Kernel fijo (NO expandir)
    vec_in = vecinos_de(int(ix0), int(iy0), KERNEL_X, KERNEL_Y, cell2pids)
    if len(vec_in) < MIN_PTS_KERNEL:
        continue 

    if not kernel_is_reactive_vs_caliza(vec_in):
        continue

    kdf = df[(df['punto_id'].isin(vec_in)) & (df['inside_S2'] == True)].copy()
    kdf = kdf[kdf[['Ferric_minor_S2','Ferrous_S2','NMDI_S2']].notna().all(axis=1)]
    if kdf.empty:
        continue
        
    per_date = (
        kdf.groupby('fecha_norm', as_index=False)
           .agg(Ferric_mean=('Ferric_minor_S2','mean'),
                NMDI_mean=('NMDI_S2','mean'),
                Ferrous_mean=('Ferrous_S2','mean'))
           .sort_values('fecha_norm')
           .reset_index(drop=True)
    )
    if per_date.empty:
        continue
    per_date['cond_A'] = (per_date['Ferric_mean'] >= FE3_Pth) & (per_date['NMDI_mean'] <= NMDI_Pth)
    per_date['Ferrous_prev'] = per_date['Ferrous_mean'].shift(1)
    per_date['Ferric_prev']  = per_date['Ferric_mean'].shift(1)

    if FE2_DROP_PCT is not None:
        cond_fe2 = (
            per_date['Ferrous_prev'].notna() &
            ((per_date['Ferrous_prev'] - per_date['Ferrous_mean']) / per_date['Ferrous_prev'] >= FE2_DROP_PCT)
        )
    else:
        cond_fe2 = (
            per_date['Ferrous_prev'].notna() &
            ((per_date['Ferrous_prev'] - per_date['Ferrous_mean']) >= FE2_DROP_MIN)
        )

    if FE3_RISE_PCT is not None:
        cond_fe3 = (
            per_date['Ferric_prev'].notna() &
            ((per_date['Ferric_mean'] - per_date['Ferric_prev']) / per_date['Ferric_prev'] >= FE3_RISE_PCT)
        )
    else:
        cond_fe3 = (
            per_date['Ferric_prev'].notna() &
            ((per_date['Ferric_mean'] - per_date['Ferric_prev']) >= FE3_RISE_MIN)
        )

    per_date['cond_B'] = (per_date['Ferric_mean'] >= FE3_Pth) & cond_fe2 & cond_fe3

    cx0 = df_pts.loc[df_pts['punto_id'] == pid0, XCOL].iloc[0]
    cy0 = df_pts.loc[df_pts['punto_id'] == pid0, YCOL].iloc[0]
    hx, hy = (KERNEL_X - 1)//2, (KERNEL_Y - 1)//2
    x0r, x1r = cx0 - hx*dx, cx0 + hx*dx
    y0r, y1r = cy0 - hy*dy, cy0 + hy*dy

    key = (int(ix0), int(iy0))  

    for _, r in per_date.iterrows():
        if bool(r['cond_A']) and bool(r['cond_B']):
            d = pd.to_datetime(r['fecha_norm']).normalize()

            if USE_TEMP:
                vec_ring_all = vecinos_de(int(ix0), int(iy0),
                                          KERNEL_X + 2*RING_MARGIN_CELLS,
                                          KERNEL_Y + 2*RING_MARGIN_CELLS,
                                          cell2pids)
                vec_ring = list(set(vec_ring_all) - set(vec_in))
                if not vec_ring:
                    continue

                kdf_temp = df[df['punto_id'].isin(vec_in)][['fecha_norm','Temp_any']]
                rdf_temp = df[df['punto_id'].isin(vec_ring)][['fecha_norm','Temp_any']]

                tk = nearest_mean_temp(kdf_temp, d, TEMP_WINDOW_DAYS)
                tr = nearest_mean_temp(rdf_temp, d, TEMP_WINDOW_DAYS)

                if np.isnan(tk) or np.isnan(tr) or (tk - tr) < TEMP_DELTA_MIN:
                    continue  
            candidates_by_kernel[key][d] = (x0r, y0r, x1r, y1r)

recurrence_by_kernel: Dict[tuple, int] = {
    key: len(dates_dict) for key, dates_dict in candidates_by_kernel.items() if dates_dict
}

interest_by_date: Dict[pd.Timestamp, List[Dict[str, Any]]] = defaultdict(list)
for key, dates_dict in candidates_by_kernel.items():
    if not dates_dict:
        continue
    count = recurrence_by_kernel.get(key, 0)
    if count <= 2:
        continue
    for d, bbox in dates_dict.items():
        x0r, y0r, x1r, y1r = bbox
        interest_by_date[d].append({
            'key': key,
            'x0': x0r, 'y0': y0r, 'x1': x1r, 'y1': y1r,
            'count': count
        })

if not interest_by_date:
    print("[INFO] No hay fechas con zonas A∩B para exportar.")
else:
    for d in sorted(interest_by_date.keys()):
        rects = interest_by_date[d]
        if not rects:
            continue
        out_img = MAP_DIR / f"map_interes_rec_{pd.Timestamp(d):%Y%m%d}.jpg"

        pts_ids = df.loc[(df['fecha_norm'] == d) & (df['inside_S2'] == True), 'punto_id']
        pts_coords = df_pts[df_pts['punto_id'].isin(pts_ids)].copy()
        if len(pts_coords) > MAX_BG_PTS:
            pts_coords = pts_coords.sample(MAX_BG_PTS, random_state=0)

        fig, ax = plt.subplots(figsize=(14, 12))
        if not pts_coords.empty:
            ax.scatter(pts_coords[XCOL], pts_coords[YCOL], s=2, color='#bdbdbd', alpha=0.6, zorder=0)
            xmin, xmax = pts_coords[XCOL].min(), pts_coords[XCOL].max()
            ymin, ymax = pts_coords[YCOL].min(), pts_coords[YCOL].max()
            rx = max(xmax - xmin, dx); ry = max(ymax - ymin, dy)
            ax.set_xlim(xmin - 0.05*rx, xmax + 0.05*rx)
            ax.set_ylim(ymin - 0.05*ry, ymax + 0.05*ry)

        counts_present = []
        for it in rects:
            c = int(it['count'])
            counts_present.append(c)
            color = color_for_count(c)
            rect = patches.Rectangle(
                (it['x0'], it['y0']),
                it['x1'] - it['x0'], it['y1'] - it['y0'],
                fill=False, lw=2.0, ec=color, alpha=0.95
            )
            ax.add_patch(rect)

        ax.set_aspect('equal', adjustable='box')
        ax.set_xticks([]); ax.set_yticks([])

        uniq_counts = sorted(set(counts_present))
        legend_elems = [Line2D([0], [0], color=color_for_count(c), lw=2,
                        label=f"Recurrencia = {c}") for c in uniq_counts]
        ax.legend(handles=legend_elems, loc='upper left', bbox_to_anchor=(1.02, 1.0),
                  borderaxespad=0., frameon=True, title="A∩B (Fe³⁺ alto, NMDI bajo, Fe²⁺↓ y Fe³⁺↑)")

        ax.set_title(
            f"Zonas A∩B coloreadas por recurrencia total del kernel (sin expansión)\n"
            f"(inside_S2==True; Lito Intrusivo/Skarn ≥60% vs Caliza) · {pd.Timestamp(d):%Y-%m-%d}",
            fontsize=11
        )

        try:
            fig.savefig(out_img, dpi=SAVE_DPI, facecolor='white',
                        bbox_inches='tight', pil_kwargs=PIL_KW)
        except TypeError:
            fig.savefig(out_img, dpi=SAVE_DPI, facecolor='white', bbox_inches='tight')
        except Exception as e:
            print(f"[ERROR savefig] {out_img}: {e}")
        else:
            print(f"[SAVE] {out_img} (rects={len(rects)})")
        plt.close(fig)

print(f"[OK] Mapas por fecha → {MAP_DIR}")

# Limpieza
del df, df_pts, cell2pids
gc.collect()