# Limpieza del dataset

In [2]:
# cleaning.py
from __future__ import annotations
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Optional, Tuple, Dict

TARGET_CANDIDATES = ("species", "target", "class")

def to_snake(s: str) -> str:
    return (
        s.strip()
         .lower()
         .replace(" ", "_")
         .replace("-", "_")
         .replace(".", "_")
    )

def detect_target(df: pd.DataFrame, candidates: Tuple[str, ...]) -> Optional[str]:
    for c in df.columns:
        if c in candidates:
            return c
    # fallback: última columna si parece categórica
    last = df.columns[-1]
    if df[last].dtype == "object" or "class" in last:
        return last
    return None

def attempt_numeric_coercion(df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str], List[str]]:
    """Intenta convertir columnas con números 'sucios' a float; conserva texto si no aplica."""
    num_cols, cat_cols = [], []
    for c in df.columns:
        if pd.api.types.is_numeric_dtype(df[c]):
            num_cols.append(c)
            continue
        # intentar coerción si hay muchos valores numéricos
        coerced = pd.to_numeric(df[c], errors="coerce")
        if coerced.notna().sum() > 0 and coerced.notna().sum() >= 0.8 * len(df):
            df[c] = coerced
            num_cols.append(c)
        else:
            cat_cols.append(c)
    return df, num_cols, cat_cols

def cap_iqr(s: pd.Series, k: float = 1.5) -> pd.Series:
    q1 = s.quantile(0.25)
    q3 = s.quantile(0.75)
    iqr = q3 - q1
    low = q1 - k * iqr
    high = q3 + k * iqr
    return s.clip(low, high)

def impute_train(
    df: pd.DataFrame,
    target_col: Optional[str],
    num_cols: List[str],
    cat_cols: List[str],
) -> pd.DataFrame:
    # numéricas
    if target_col and target_col in df.columns:
        # asegurar que el target quede como texto (no numeric encoding aún)
        if not pd.api.types.is_object_dtype(df[target_col]):
            df[target_col] = df[target_col].astype(str)
        for c in num_cols:
            if c == target_col:
                continue
            if df[c].isna().any():
                med_by_class = df.groupby(target_col)[c].transform(lambda s: s.fillna(s.median()))
                df[c] = np.where(df[c].isna(), med_by_class, df[c])
                if df[c].isna().any():
                    df[c] = df[c].fillna(df[c].median())
    else:
        for c in num_cols:
            df[c] = df[c].fillna(df[c].median())
    # categóricas
    for c in cat_cols:
        if c == target_col:
            continue
        if df[c].isna().any():
            mode = df[c].mode(dropna=True)
            df[c] = df[c].fillna(mode.iloc[0] if len(mode) else "missing")
    return df

def clean_train(
    input_csv: str | Path,
    output_csv: str | Path,
    report_md: str | Path,
) -> Dict:
    input_csv, output_csv, report_md = Path(input_csv), Path(output_csv), Path(report_md)
    df = pd.read_csv(input_csv)

    # normalizar headers
    df.columns = [to_snake(c) for c in df.columns]

    # detectar objetivo
    target_col = detect_target(df, TARGET_CANDIDATES)

    # métricas iniciales
    shape_before = df.shape
    dtypes_before = df.dtypes.astype(str).to_dict()
    miss_before = df.isna().sum().to_dict()
    dup_before = int(df.duplicated().sum())

    # duplicados
    df = df.drop_duplicates().reset_index(drop=True)

    # coerción numérica
    df, num_cols, cat_cols = attempt_numeric_coercion(df)

    # imputación (train-only)
    df = impute_train(df, target_col, num_cols, cat_cols)

    # capping IQR en numéricas (no toca target)
    for c in num_cols:
        if c != target_col:
            df[c] = cap_iqr(df[c])

    # métricas finales
    shape_after = df.shape
    dup_after = int(df.duplicated().sum())
    miss_after = df.isna().sum().to_dict()

    # guardar
    df.to_csv(output_csv, index=False)

    # informe
    lines = []
    lines.append("# Informe de limpieza\n")
    lines.append(f"- **Archivo origen:** {input_csv.name}")
    lines.append(f"- **Filas x columnas (antes):** {shape_before[0]} x {shape_before[1]}")
    lines.append(f"- **Filas x columnas (después):** {shape_after[0]} x {shape_after[1]}")
    lines.append(f"- **Duplicados (antes → después):** {dup_before} → {dup_after}")
    lines.append(f"- **Tipos (antes):** {dtypes_before}")
    lines.append(f"- **Faltantes (antes):** {miss_before}")
    lines.append(f"- **Faltantes (después):** {miss_after}")
    lines.append(f"- **Target detectado:** {target_col or '(no detectado)'}")
    lines.append(f"- **Numéricas:** {', '.join([c for c in num_cols if c != target_col]) or '(ninguna)'}")
    lines.append(f"- **Categóricas:** {', '.join([c for c in cat_cols if c != target_col]) or '(ninguna)'}")
    report_md.write_text("\n".join(lines), encoding="utf-8")

    return {
        "target": target_col,
        "num_cols": num_cols,
        "cat_cols": cat_cols,
        "shape_before": shape_before,
        "shape_after": shape_after,
        "duplicates_before": dup_before,
        "duplicates_after": dup_after,
        "missing_before": miss_before,
        "missing_after": miss_after,
        "output": str(output_csv),
        "report": str(report_md),
    }

if __name__ == "__main__":
    # Ejemplo de uso:
    OUT = clean_train(
        input_csv="iris_train.csv",
        output_csv="iris_train_clean.csv",
        report_md="cleaning_report_iris_train.md",
    )
    print("Limpieza completa:", OUT)


Limpieza completa: {'target': 'target', 'num_cols': ['sepal_length_(cm)', 'sepal_width_(cm)', 'petal_length_(cm)', 'petal_width_(cm)', 'target'], 'cat_cols': [], 'shape_before': (125, 5), 'shape_after': (119, 5), 'duplicates_before': 6, 'duplicates_after': 0, 'missing_before': {'sepal_length_(cm)': 2, 'sepal_width_(cm)': 2, 'petal_length_(cm)': 1, 'petal_width_(cm)': 5, 'target': 0}, 'missing_after': {'sepal_length_(cm)': 0, 'sepal_width_(cm)': 0, 'petal_length_(cm)': 0, 'petal_width_(cm)': 0, 'target': 0}, 'output': 'iris_train_clean.csv', 'report': 'cleaning_report_iris_train.md'}


random-forest-api/
  app/
    main.py              # TODO: FastAPI/Flask con /health, /info, /predict
    requirements.txt
  model/
    rf_custom.py         # TODO: SimpleRandomForest (implementación propia)
    rf_sklearn.py        # TODO: Wrapper de RandomForestClassifier
    model.pkl            # placeholder vacío
  notebooks/
  render.yaml
  README.md


In [3]:
# TODO: implementar SimpleRandomForest (se llenará desde la API)
class SimpleRandomForest:
    def __init__(self, n_estimators=100, max_depth=None, random_state=42):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.random_state = random_state
        self._fitted = False

    def fit(self, X, y):
        """TODO: entrenar bosque propio"""
        raise NotImplementedError("Implementar en el equipo de API")

    def predict(self, X):
        """TODO: predicción"""
        if not self._fitted:
            raise RuntimeError("El modelo no está entrenado")
        raise NotImplementedError("Implementar en el equipo de API")

    def predict_proba(self, X):
        """TODO: probas (si aplica)"""
        if not self._fitted:
            raise RuntimeError("El modelo no está entrenado")
        raise NotImplementedError("Implementar en el equipo de API")

# TODO: implementar wrapper de RandomForestClassifier de sklearn
class SKRandomForest:
    def __init__(self, **kwargs):
        self.params = kwargs
        self.model = None  # se inicializa en fit

    def fit(self, X, y):
        """TODO: from sklearn.ensemble import RandomForestClassifier"""
        raise NotImplementedError("Implementar en el equipo de API")

    def predict(self, X):
        if self.model is None:
            raise RuntimeError("El modelo no está entrenado")
        raise NotImplementedError("Implementar en el equipo de API")

    def predict_proba(self, X):
        if self.model is None:
            raise RuntimeError("El modelo no está entrenado")
        raise NotImplementedError("Implementar en el equipo de API")


In [4]:
# simple_random_forest.py
# -*- coding: utf-8 -*-
"""
Random Forest minimal (bagging + max_features por split) usando árboles de scikit-learn.
Implementamos manualmente:
  - bootstrap de ejemplos por árbol
  - agregación de predicciones (mayoría / promedio)

Escogemos la vía más SENCILLA y robusta:
  - El submuestreo de variables se hace por split usando el parámetro nativo
    `max_features` de DecisionTree*, que es lo habitual en Random Forest.

Requisitos:
    pip install numpy scikit-learn
"""

from __future__ import annotations
from typing import Optional, Union, List
import numpy as np
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import LabelEncoder


class SimpleRandomForest:
    """
    Parámetros
    ----------
    n_estimators : int
        Nº de árboles en el ensamble.
    max_features : {'sqrt','log2'} o int o float o None
        Se pasa directo al DecisionTree* para submuestreo POR SPLIT.
        - 'sqrt' recomendado en clasificación.
        - None usa todas las variables.
    max_depth : Optional[int]
        Profundidad máx. de cada árbol (control de varianza).
    random_state : Optional[int]
        Semilla global (genera sub-semillas por árbol).
    task : {'auto','classification','regression'}
        'auto' infiere por el tipo de y.
    """

    def __init__(
        self,
        n_estimators: int = 100,
        max_features: Union[str, int, float, None] = "sqrt",
        max_depth: Optional[int] = None,
        random_state: Optional[int] = None,
        task: str = "auto",
    ):
        if n_estimators < 1:
            raise ValueError("n_estimators debe ser >= 1")
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.max_depth = max_depth
        self.random_state = random_state
        self.task = task

        # Atributos post-fit
        self._rng = np.random.RandomState(self.random_state)
        self._forest: List[Union[DecisionTreeClassifier, DecisionTreeRegressor]] = []
        self._is_classifier: Optional[bool] = None
        self._label_encoder: Optional[LabelEncoder] = None
        self.n_features_in_: Optional[int] = None

    # ------------------------- utilidades internas -------------------------

    def _bootstrap_sample(self, X: np.ndarray, y: np.ndarray):
        """Muestra bootstrap de filas con reemplazo."""
        n = X.shape[0]
        idx = self._rng.randint(0, n, size=n)  # con reemplazo
        return X[idx], y[idx]

    # ---------------------------- API pública -----------------------------

    def fit(self, X: np.ndarray, y: np.ndarray) -> "SimpleRandomForest":
        X = np.asarray(X)
        y = np.asarray(y)

        if X.ndim != 2:
            raise ValueError("X debe ser 2D (n_muestras, n_features).")
        if len(y.shape) != 1:
            raise ValueError("y debe ser 1D (n_muestras,).")
        if X.shape[0] != y.shape[0]:
            raise ValueError("X e y deben tener el mismo número de filas.")

        self.n_features_in_ = X.shape[1]

        # Detectar tarea
        if self.task == "auto":
            # Heurística simple: floats continuos -> regresión; lo demás -> clasificación
            self._is_classifier = not np.issubdtype(y.dtype, np.floating)
        else:
            if self.task not in {"classification", "regression"}:
                raise ValueError("task debe ser 'auto', 'classification' o 'regression'.")
            self._is_classifier = (self.task == "classification")

        # Codificación de etiquetas si clasificación
        if self._is_classifier:
            self._label_encoder = LabelEncoder()
            y_fit = self._label_encoder.fit_transform(y)
        else:
            self._label_encoder = None
            y_fit = y.astype(float)

        # Entrenamiento con bagging
        self._forest = []
        for _ in range(self.n_estimators):
            Xb, yb = self._bootstrap_sample(X, y_fit)
            seed = int(self._rng.randint(0, 10**9))

            if self._is_classifier:
                tree = DecisionTreeClassifier(
                    criterion="gini",
                    max_depth=self.max_depth,
                    max_features=self.max_features,  # submuestreo POR SPLIT (estándar en RF)
                    random_state=seed,
                )
            else:
                tree = DecisionTreeRegressor(
                    criterion="squared_error",
                    max_depth=self.max_depth,
                    max_features=self.max_features,  # submuestreo POR SPLIT
                    random_state=seed,
                )
            tree.fit(Xb, yb)
            self._forest.append(tree)

        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        if not self._forest:
            raise RuntimeError("Llama a fit(X, y) antes de predict(X).")

        X = np.asarray(X)
        if X.ndim != 2:
            raise ValueError("X debe ser 2D.")
        if self.n_features_in_ is not None and X.shape[1] != self.n_features_in_:
            raise ValueError(
                f"Columnas de X ({X.shape[1]}) no coinciden con las usadas en fit ({self.n_features_in_})."
            )

        preds = [est.predict(X).reshape(-1, 1) for est in self._forest]
        P = np.hstack(preds)  # (n_muestras, n_estimators)

        if self._is_classifier:
            # voto mayoritario
            n = P.shape[0]
            votes = np.empty(n, dtype=int)
            for i in range(n):
                counts = np.bincount(P[i, :].astype(int))
                votes[i] = np.argmax(counts)
            if self._label_encoder is not None:
                return self._label_encoder.inverse_transform(votes)
            return votes
        else:
            # promedio
            return P.mean(axis=1)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        if not self._is_classifier:
            raise AttributeError("predict_proba solo aplica a clasificación.")
        if not self._forest:
            raise RuntimeError("Llama a fit(X, y) antes de predict_proba(X).")

        X = np.asarray(X)
        proba = self._forest[0].predict_proba(X)
        acc = np.zeros_like(proba)
        acc += proba
        for est in self._forest[1:]:
            acc += est.predict_proba(X)
        acc /= float(len(self._forest))
        return acc


# ------------------------- DEMO rápida y compatible -------------------------
if __name__ == "__main__":
    from sklearn.datasets import load_iris, make_regression
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score, mean_squared_error

    rng_seed = 42

    # Clasificación: Iris
    iris = load_iris()
    Xc, yc = iris.data, iris.target
    Xc_tr, Xc_te, yc_tr, yc_te = train_test_split(
        Xc, yc, test_size=0.30, random_state=rng_seed, stratify=yc
    )

    # Árbol único (baseline)
    dtc = DecisionTreeClassifier(random_state=rng_seed)
    dtc.fit(Xc_tr, yc_tr)
    acc_tree = accuracy_score(yc_te, dtc.predict(Xc_te))

    # Nuestro RF sencillo (más árboles para estabilidad en Iris)
    srf = SimpleRandomForest(
        n_estimators=200,
        max_features="sqrt",
        max_depth=None,
        random_state=rng_seed,
        task="classification",
    ).fit(Xc_tr, yc_tr)
    acc_rf = accuracy_score(yc_te, srf.predict(Xc_te))

    print("=== Clasificación (Iris) ===")
    print(f"Árbol único  -> Accuracy: {acc_tree:.3f}")
    print(f"SimpleRF     -> Accuracy: {acc_rf:.3f}")

    # Regresión sintética
    Xr, yr = make_regression(
        n_samples=1200, n_features=12, n_informative=8, noise=15.0, random_state=rng_seed
    )
    Xr_tr, Xr_te, yr_tr, yr_te = train_test_split(Xr, yr, test_size=0.30, random_state=rng_seed)

    dtr = DecisionTreeRegressor(random_state=rng_seed)
    dtr.fit(Xr_tr, yr_tr)
    rmse_tree = float(np.sqrt(mean_squared_error(yr_te, dtr.predict(Xr_te))))

    srf_reg = SimpleRandomForest(
        n_estimators=200,
        max_features="sqrt",
        max_depth=None,
        random_state=rng_seed,
        task="regression",
    ).fit(Xr_tr, yr_tr)
    rmse_rf = float(np.sqrt(mean_squared_error(yr_te, srf_reg.predict(Xr_te))))

    print("\n=== Regresión (sintético) ===")
    print(f"Árbol único  -> RMSE: {rmse_tree:.3f}")
    print(f"SimpleRF     -> RMSE: {rmse_rf:.3f}")


=== Clasificación (Iris) ===
Árbol único  -> Accuracy: 0.933
SimpleRF     -> Accuracy: 0.911

=== Regresión (sintético) ===
Árbol único  -> RMSE: 103.683
SimpleRF     -> RMSE: 73.264


In [6]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load the cleaned data
df_cleaned = pd.read_csv('iris_train_clean.csv')

# Assuming the target column is 'target' as detected in the cleaning step
TARGET_COLUMN = 'target'

# Prepare data for the model
X = df_cleaned.drop(columns=[TARGET_COLUMN])
y = df_cleaned[TARGET_COLUMN]

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, random_state=rng_seed, stratify=y
)

# Initialize and train the SimpleRandomForest model
srf_cleaned = SimpleRandomForest(
    n_estimators=200,
    max_features="sqrt",
    max_depth=None,
    random_state=rng_seed,
    task="classification",
)
srf_cleaned.fit(X_train.values, y_train.values) # Convert to numpy arrays for SimpleRandomForest

# Evaluate the model
y_pred = srf_cleaned.predict(X_test.values) # Convert to numpy array
accuracy_cleaned = accuracy_score(y_test, y_pred)

print(f"Accuracy of SimpleRandomForest on cleaned data: {accuracy_cleaned:.3f}")

Accuracy of SimpleRandomForest on cleaned data: 0.917


In [7]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Initialize and train the scikit-learn RandomForestClassifier model
sklearn_rf = RandomForestClassifier(
    n_estimators=200,
    max_features="sqrt",
    max_depth=None,
    random_state=rng_seed,
)

sklearn_rf.fit(X_train, y_train) # scikit-learn models can handle pandas DataFrames directly

# Evaluate the scikit-learn model
y_pred_sklearn = sklearn_rf.predict(X_test)
accuracy_sklearn = accuracy_score(y_test, y_pred_sklearn)

print(f"Accuracy of scikit-learn RandomForestClassifier on cleaned data: {accuracy_sklearn:.3f}")

Accuracy of scikit-learn RandomForestClassifier on cleaned data: 0.917


### Comparación de Modelos

Hemos entrenado y evaluado dos modelos de Random Forest en los datos limpios del dataset Iris:

1.  **SimpleRandomForest (implementación propia):** Obtuvimos una precisión (accuracy) de **{{accuracy_cleaned:.3f}}**.
2.  **scikit-learn RandomForestClassifier:** Obtuvimos una precisión (accuracy) de **{{accuracy_sklearn:.3f}}**.

En este caso particular y con los hiperparámetros utilizados, ambos modelos lograron la misma precisión en el conjunto de prueba. Esto sugiere que tu implementación `SimpleRandomForest` se comporta de manera muy similar a la versión estándar de scikit-learn para este dataset.

Consideraciones:
*   **Complejidad:** La implementación de scikit-learn es mucho más robusta, optimizada y probada, con soporte para más funcionalidades y tipos de datos.
*   **Personalización:** Tu `SimpleRandomForest` te da control total sobre el proceso, lo cual es útil para entender los fundamentos o implementar variaciones específicas.
*   **Rendimiento:** Para datasets grandes o entrenamientos intensivos, la versión de scikit-learn probablemente será más eficiente.

Para una comparación más profunda, se podrían analizar otras métricas como precisión, recall, F1-score, o la curva ROC (para clasificación binaria), así como el tiempo de entrenamiento y predicción.