# Avance 3 — Baseline (Equipo 20)
**Proyecto:** Detección / clasificación de anillos en galaxias (imágenes FITS + variables tabulares)  
**Objetivo del notebook:** Construir un **modelo baseline** reproducible que sirva como **marco de referencia** para iteraciones futuras.

---

## Checklist del feedback (Semana 2) que se atiende aquí
- ✅ Probar **colores vs escala de grises** y combinaciones de contrastes (muy rojo/muy azul).  
- ✅ Usar **color index maps** (p. ej. ND-like: (g-r)/(g+r), (r-z)/(r+z), (g-z)/(g+z)).  
- ✅ Experimentar con **preprocesamientos** (asinh, clipping, reducción de tamaño).  
- ✅ **Rebalanceo de clases** (class_weight + oversampling opcional).  
- ✅ Bloque preparado para **integrar un 2º dataset** (si llega).  
- ✅ Leer archivos **FITS** y analizar.  
- ✅ Excluir **anillos nucleares y pseudoanillos** (limitación del dataset).

---

## Requerimientos del entregable (Avance 3)
- 3.1 Definir **medidas de calidad** del modelo.
- 3.2 Proveer un **baseline** para evaluar viabilidad y mejorar modelos más avanzados.


In [None]:
# === (0) Setup ===
# Si estás en Google Colab, descomenta estas líneas:
# !pip -q install astropy photutils scikit-image imbalanced-learn

import os
import numpy as np
import pandas as pd
from collections import Counter

import matplotlib.pyplot as plt

from astropy.io import fits
from astropy.visualization import make_lupton_rgb

from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    f1_score,
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay,
)
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.inspection import permutation_importance

# Opcional (solo si tienes imbalanced-learn):
try:
    from imblearn.over_sampling import RandomOverSampler
    IMB_AVAILABLE = True
except Exception:
    IMB_AVAILABLE = False

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)


## 1) Configuración de rutas y archivos
Ajusta `DATA_DIR`, `IMAGES_DIR` y el nombre del catálogo si tu repo usa otras rutas.


In [None]:
# === (1) Paths ===
DATA_DIR = "./Dataset"
IMAGES_DIR = "./Images"
CATALOG_FITS = os.path.join(DATA_DIR, "dataset.fits")

# Segundo dataset (placeholder)
DATASET2_DIR = "./Dataset2"
CATALOG2_FITS = os.path.join(DATASET2_DIR, "dataset2.fits")

print("CATALOG_FITS exists:", os.path.exists(CATALOG_FITS))
print("IMAGES_DIR exists:", os.path.exists(IMAGES_DIR))


## 2) Carga del catálogo FITS (tabular)


In [None]:
def fits_table_to_df(fits_path: str, ext: int = 1) -> pd.DataFrame:
    hdul = fits.open(fits_path)
    try:
        data = hdul[ext].data
        df = pd.DataFrame(np.array(data).byteswap().newbyteorder())  # endian-safe
    finally:
        hdul.close()
    return df

df = fits_table_to_df(CATALOG_FITS, ext=1)
df.head(), df.shape


### 2.1 Mapeo y redefinición de la variable objetivo (4 clases)
Se descartan **nuclear** y **pseudo** (feedback).


In [None]:
ANILLOS_MAP = {0:"Sin", 2:"Nuclear", 4:"Interno", 8:"Externo", 12:"Interno+Externo", 16:"Pseudo"}

def to_4class(anillos_value: int):
    if anillos_value in (2, 16):
        return None
    if anillos_value == 0:
        return 0
    if anillos_value == 4:
        return 1
    if anillos_value == 8:
        return 2
    if anillos_value == 12:
        return 3
    return None

df["anillos_label"] = df["anillos"].map(ANILLOS_MAP).fillna("Desconocido")
df["ring_class"] = df["anillos"].apply(to_4class)

print("Distribución original anillos:\n", df["anillos_label"].value_counts())
print("\nDistribución ring_class (incluye NaN):\n", df["ring_class"].value_counts(dropna=False))


### 2.2 Filtro de outliers en `z` (±2σ)


In [None]:
df0 = df.dropna(subset=["ring_class"]).copy()

z_mean = df0["z"].mean()
z_std = df0["z"].std()
lower, upper = z_mean - 2*z_std, z_mean + 2*z_std

df1 = df0[(df0["z"] >= lower) & (df0["z"] <= upper)].copy()

print("Rows after target filter:", len(df0))
print("Rows after z ±2σ filter:", len(df1))


## 3) Ingeniería de características tabulares


In [None]:
df1["z_log1p"] = np.log1p(df1["z"].clip(lower=0))
df1["z_bin"] = pd.qcut(df1["z"], q=3, labels=["low","mid","high"])

ra_rad = np.deg2rad(df1["ra"])
dec_rad = np.deg2rad(df1["dec"])
df1["ra_sin"] = np.sin(ra_rad)
df1["ra_cos"] = np.cos(ra_rad)
df1["dec_sin"] = np.sin(dec_rad)
df1["dec_cos"] = np.cos(dec_rad)

df1[["objID","ra","dec","z","z_log1p","ra_sin","ra_cos","dec_sin","dec_cos","ring_class"]].head()


## 4) Lectura de imágenes FITS y features visuales (baseline)
Probamos **gray**, **idx** (color index maps) y **mix** (gray + idx), con **reducción de tamaño**.


In [None]:
def safe_div(a, b, eps=1e-6):
    return a / (b + eps)

def asinh_norm(x, clip_percentile=99.5):
    x = np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0)
    hi = np.percentile(np.abs(x), clip_percentile)
    if hi <= 0:
        return np.zeros_like(x, dtype=np.float32)
    x = np.clip(x, -hi, hi)
    y = np.arcsinh(x)
    y = y - y.min()
    if y.max() > 0:
        y = y / y.max()
    return y.astype(np.float32)

def read_grz_from_fits(fits_path: str):
    hdul = fits.open(fits_path)
    try:
        data = hdul[0].data
        if data is None:
            data = hdul[1].data
        data = np.array(data)
        if data.ndim == 3:
            g = data[0].astype(np.float32)
            r = data[1].astype(np.float32)
            z = data[2].astype(np.float32)
            return g, r, z
        raise ValueError(f"Formato inesperado: ndim={data.ndim}")
    finally:
        hdul.close()

def lupton_rgb(g, r, z, Q=10, stretch=0.5):
    return make_lupton_rgb(r, g, z, Q=Q, stretch=stretch)

def grayscale_from_rgb(rgb_uint8):
    rgb = rgb_uint8.astype(np.float32) / 255.0
    gray = 0.2989*rgb[...,0] + 0.5870*rgb[...,1] + 0.1140*rgb[...,2]
    return gray.astype(np.float32)

def resize_nearest(img, out_h=64, out_w=64):
    in_h, in_w = img.shape[:2]
    y_idx = (np.linspace(0, in_h-1, out_h)).astype(int)
    x_idx = (np.linspace(0, in_w-1, out_w)).astype(int)
    if img.ndim == 2:
        return img[np.ix_(y_idx, x_idx)]
    return img[np.ix_(y_idx, x_idx, np.arange(img.shape[2]))]

def extract_image_features(fits_path: str, size=64, mode="mix"):
    g, r, z = read_grz_from_fits(fits_path)

    idx_gr = safe_div((g - r), (g + r))
    idx_rz = safe_div((r - z), (r + z))
    idx_gz = safe_div((g - z), (g + z))

    g_n = asinh_norm(g); r_n = asinh_norm(r); z_n = asinh_norm(z)
    idx_gr_n = asinh_norm(idx_gr); idx_rz_n = asinh_norm(idx_rz); idx_gz_n = asinh_norm(idx_gz)

    rgb = lupton_rgb(g_n, r_n, z_n)  # uint8
    rgb_rs = resize_nearest(rgb, size, size)

    gray = grayscale_from_rgb(rgb_rs)
    gray_rs = resize_nearest(gray, size, size)

    idx_stack = np.stack([idx_gr_n, idx_rz_n, idx_gz_n], axis=-1)
    idx_rs = resize_nearest(idx_stack, size, size)

    if mode == "rgb":
        return (rgb_rs.astype(np.float32) / 255.0).reshape(-1)
    if mode == "gray":
        return gray_rs.reshape(-1)
    if mode == "idx":
        return idx_rs.reshape(-1)
    if mode == "mix":
        return np.concatenate([gray_rs.reshape(-1), idx_rs.reshape(-1)], axis=0)

    raise ValueError("mode must be one of: rgb, gray, idx, mix")

print("Image feature functions ready.")


### 4.2 Unir catálogo con imágenes locales


In [None]:
df1["objID_str"] = df1["objID"].astype(str).str.replace(".0", "", regex=False)

def find_img_path(objid_str):
    candidate = os.path.join(IMAGES_DIR, f"{objid_str}.fits")
    return candidate if os.path.exists(candidate) else None

df1["img_path"] = df1["objID_str"].apply(find_img_path)
print("Con imagen:", df1["img_path"].notna().sum(), "/", len(df1))

df_img = df1.dropna(subset=["img_path"]).copy()
df_img[["objID_str","img_path","ring_class"]].head()


## 5) Métricas y modelos baseline
**Métricas principales (clases desbalanceadas):** Balanced Accuracy y F1-macro.


In [None]:
TARGET = "ring_class"
TAB_FEATURES = ["z", "z_log1p", "ra_sin", "ra_cos", "dec_sin", "dec_cos"]

df_mod = df_img.dropna(subset=TAB_FEATURES + [TARGET]).copy()
X_tab = df_mod[TAB_FEATURES].astype(float)
y = df_mod[TARGET].astype(int)

X_train_tab, X_test_tab, y_train, y_test, df_train, df_test = train_test_split(
    X_tab, y, df_mod, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)

print("Train class dist:", Counter(y_train))
print("Test  class dist:", Counter(y_test))


In [None]:
def eval_model(name, model, X_train, y_train, X_test, y_test):
    model.fit(X_train, y_train)
    pred_tr = model.predict(X_train)
    pred_te = model.predict(X_test)
    return {
        "model": name,
        "acc_train": accuracy_score(y_train, pred_tr),
        "acc_test": accuracy_score(y_test, pred_te),
        "bal_acc_train": balanced_accuracy_score(y_train, pred_tr),
        "bal_acc_test": balanced_accuracy_score(y_test, pred_te),
        "f1m_train": f1_score(y_train, pred_tr, average="macro"),
        "f1m_test": f1_score(y_test, pred_te, average="macro"),
    }, pred_te


## 6) Baseline A: Tabular-only


In [None]:
models_tab = []

dummy_mf = DummyClassifier(strategy="most_frequent", random_state=RANDOM_STATE)
dummy_strat = DummyClassifier(strategy="stratified", random_state=RANDOM_STATE)

logreg_tab = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(
        max_iter=2000,
        class_weight="balanced",
        random_state=RANDOM_STATE
    ))
])

rf_tab = RandomForestClassifier(
    n_estimators=400,
    class_weight="balanced_subsample",
    random_state=RANDOM_STATE,
    n_jobs=-1
)

for name, m in [
    ("Dummy-most_frequent (tab)", dummy_mf),
    ("Dummy-stratified (tab)", dummy_strat),
    ("LogReg-balanced (tab)", logreg_tab),
    ("RF-balanced (tab)", rf_tab),
]:
    res, _ = eval_model(name, m, X_train_tab, y_train, X_test_tab, y_test)
    models_tab.append(res)

pd.DataFrame(models_tab).sort_values("bal_acc_test", ascending=False)


## 7) Baseline B: Imagen + (opcional) Tabular
En esta versión usamos solo la imagen (pixeles) para simplificar.  
En iteración futura pueden concatenar `X_tab` a la matriz de imagen.


In [None]:
def build_image_matrix(df_subset: pd.DataFrame, size=64, mode="mix"):
    feats = []
    ok = []
    for i, p in enumerate(df_subset["img_path"].tolist()):
        try:
            feats.append(extract_image_features(p, size=size, mode=mode))
            ok.append(i)
        except Exception:
            continue
    X = np.vstack(feats) if len(feats) else np.empty((0,0))
    return X, df_subset.iloc[ok].copy()

SIZE = 64

Xtr_gray, dftr_gray = build_image_matrix(df_train, size=SIZE, mode="gray")
Xte_gray, dfte_gray = build_image_matrix(df_test,  size=SIZE, mode="gray")

Xtr_idx,  dftr_idx  = build_image_matrix(df_train, size=SIZE, mode="idx")
Xte_idx,  dfte_idx  = build_image_matrix(df_test,  size=SIZE, mode="idx")

Xtr_mix,  dftr_mix  = build_image_matrix(df_train, size=SIZE, mode="mix")
Xte_mix,  dfte_mix  = build_image_matrix(df_test,  size=SIZE, mode="mix")

ytr_gray = dftr_gray[TARGET].astype(int).to_numpy()
yte_gray = dfte_gray[TARGET].astype(int).to_numpy()

ytr_idx  = dftr_idx[TARGET].astype(int).to_numpy()
yte_idx  = dfte_idx[TARGET].astype(int).to_numpy()

ytr_mix  = dftr_mix[TARGET].astype(int).to_numpy()
yte_mix  = dfte_mix[TARGET].astype(int).to_numpy()

print("mix shapes:", Xtr_mix.shape, Xte_mix.shape)
print("Train class dist (mix):", Counter(ytr_mix))


In [None]:
# Rebalanceo: class_weight + oversampling opcional
def maybe_oversample(X, y):
    if not IMB_AVAILABLE:
        return X, y, False
    ros = RandomOverSampler(random_state=RANDOM_STATE)
    X2, y2 = ros.fit_resample(X, y)
    return X2, y2, True

logreg_img = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(
        max_iter=2000,
        class_weight="balanced",
        random_state=RANDOM_STATE
    ))
])

rf_img = RandomForestClassifier(
    n_estimators=400,
    class_weight="balanced_subsample",
    random_state=RANDOM_STATE,
    n_jobs=-1
)


In [None]:
def run_suite(Xtr, ytr, Xte, yte, label):
    rows = []
    preds = {}

    dummy = DummyClassifier(strategy="stratified", random_state=RANDOM_STATE)
    res, p = eval_model(f"Dummy-stratified ({label})", dummy, Xtr, ytr, Xte, yte)
    rows.append(res); preds[res["model"]] = p

    res, p = eval_model(f"LogReg-balanced ({label})", logreg_img, Xtr, ytr, Xte, yte)
    rows.append(res); preds[res["model"]] = p

    res, p = eval_model(f"RF-balanced ({label})", rf_img, Xtr, ytr, Xte, yte)
    rows.append(res); preds[res["model"]] = p

    return pd.DataFrame(rows).sort_values("bal_acc_test", ascending=False), preds

res_gray, _ = run_suite(Xtr_gray, ytr_gray, Xte_gray, yte_gray, f"gray@{SIZE}")
res_idx,  _ = run_suite(Xtr_idx,  ytr_idx,  Xte_idx,  yte_idx,  f"idx@{SIZE}")
res_mix,  _ = run_suite(Xtr_mix,  ytr_mix,  Xte_mix,  yte_mix,  f"mix@{SIZE}")

res_gray, res_idx, res_mix


## 8) Reporte visual del baseline final (mix)


In [None]:
def show_report(model, Xtr, ytr, Xte, yte, title):
    model.fit(Xtr, ytr)
    pred_te = model.predict(Xte)

    print(title)
    print("\nBalanced Acc:", balanced_accuracy_score(yte, pred_te))
    print("F1-macro    :", f1_score(yte, pred_te, average="macro"))
    print("\nClassification report:\n", classification_report(yte, pred_te, digits=3))

    cm = confusion_matrix(yte, pred_te)
    disp = ConfusionMatrixDisplay(cm)
    disp.plot(values_format="d")
    plt.title(title)
    plt.show()

# Oversampling solo si está disponible
if IMB_AVAILABLE:
    Xtr_use, ytr_use, used = (*maybe_oversample(Xtr_mix, ytr_mix),)
    Xtr_use, ytr_use, used = Xtr_use[0], Xtr_use[1], Xtr_use[2]
else:
    Xtr_use, ytr_use, used = Xtr_mix, ytr_mix, False

title = f"Baseline final: LogReg mix@{SIZE} ({'ROS' if used else 'class_weight'})"
show_report(logreg_img, Xtr_use, ytr_use, Xte_mix, yte_mix, title)


## 9) Importancia de características (Permutation Importance)


In [None]:
logreg_img.fit(Xtr_use, ytr_use)

sample_n = min(300, Xte_mix.shape[0])
X_imp = Xte_mix[:sample_n]
y_imp = yte_mix[:sample_n]

perm = permutation_importance(
    logreg_img, X_imp, y_imp,
    scoring="balanced_accuracy",
    n_repeats=10,
    random_state=RANDOM_STATE
)

importances = perm.importances_mean
top_idx = np.argsort(importances)[::-1][:20]

pd.DataFrame({
    "feature_index": top_idx,
    "importance_mean": importances[top_idx]
})


## 10) Evidencia de sub/sobreajuste: Validación cruzada


In [None]:
def cv_scores(model, X, y, label):
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    scoring = {"bal_acc":"balanced_accuracy", "f1_macro":"f1_macro", "acc":"accuracy"}
    scores = cross_validate(model, X, y, cv=cv, scoring=scoring, return_train_score=True)
    out = {
        "label": label,
        "train_bal_acc_mean": np.mean(scores["train_bal_acc"]),
        "val_bal_acc_mean": np.mean(scores["test_bal_acc"]),
        "train_f1m_mean": np.mean(scores["train_f1_macro"]),
        "val_f1m_mean": np.mean(scores["test_f1_macro"]),
    }
    return out

cv_out = cv_scores(logreg_img, Xtr_mix, ytr_mix, f"LogReg mix@{SIZE} (class_weight)")
pd.DataFrame([cv_out])


## 11) Desempeño mínimo
Comparación contra Dummy-stratified.


In [None]:
dummy = DummyClassifier(strategy="stratified", random_state=RANDOM_STATE)
dummy.fit(Xtr_mix, ytr_mix)
pred_dummy = dummy.predict(Xte_mix)

min_bal = balanced_accuracy_score(yte_mix, pred_dummy)
min_f1m = f1_score(yte_mix, pred_dummy, average="macro")

logreg_img.fit(Xtr_use, ytr_use)
pred_lr = logreg_img.predict(Xte_mix)

bal_lr = balanced_accuracy_score(yte_mix, pred_lr)
f1m_lr = f1_score(yte_mix, pred_lr, average="macro")

print("Dummy Balanced Acc:", min_bal, " Dummy F1-macro:", min_f1m)
print("LR    Balanced Acc:", bal_lr, " LR    F1-macro:", f1m_lr)
print("Δ Balanced Acc:", bal_lr - min_bal)
print("Δ F1-macro    :", f1m_lr - min_f1m)


## 12) Bloque preparado para integrar un 2º dataset


In [None]:
def load_and_prepare_catalog(fits_path: str) -> pd.DataFrame:
    df_ = fits_table_to_df(fits_path, ext=1)
    df_["ring_class"] = df_["anillos"].apply(to_4class)
    df_ = df_.dropna(subset=["ring_class"]).copy()

    z_mean_ = df_["z"].mean()
    z_std_ = df_["z"].std()
    lower_, upper_ = z_mean_ - 2*z_std_, z_mean_ + 2*z_std_
    df_ = df_[(df_["z"] >= lower_) & (df_["z"] <= upper_)].copy()

    df_["z_log1p"] = np.log1p(df_["z"].clip(lower=0))
    ra_rad_ = np.deg2rad(df_["ra"])
    dec_rad_ = np.deg2rad(df_["dec"])
    df_["ra_sin"] = np.sin(ra_rad_)
    df_["ra_cos"] = np.cos(ra_rad_)
    df_["dec_sin"] = np.sin(dec_rad_)
    df_["dec_cos"] = np.cos(dec_rad_)

    df_["objID_str"] = df_["objID"].astype(str).str.replace(".0", "", regex=False)
    return df_

if os.path.exists(CATALOG2_FITS):
    df2 = load_and_prepare_catalog(CATALOG2_FITS)
    print("Dataset2 loaded:", df2.shape)
    print("Class dist ds2:", Counter(df2["ring_class"].astype(int)))
else:
    print("Dataset2 no encontrado aún. (OK)")
