
# Лабораторная работа №4 - исследования со случайным лесом (Random Forest)  
(повтор пунктов 2–4 из ЛР №1)

В ноутбуке выполнены пункты **2–4**:
- **2. Создание бейзлайна и оценка качества** (sklearn)
- **3. Улучшение бейзлайна** (гипотезы → проверка → улучшенный бейзлайн)
- **4. Имплементация алгоритма** (Random Forest) **с нуля** + сравнения

## Открытые датасеты по ссылке (UCI)
- **Классификация:** Banknote Authentication  
  `https://archive.ics.uci.edu/ml/machine-learning-databases/00267/data_banknote_authentication.txt`
- **Регрессия:** Auto MPG  
  `https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data`

## Метрики
- Классификация: **accuracy**, **F1-macro**, **ROC-AUC**
- Регрессия: **MAE**, **RMSE**, **R²**


In [1]:

import numpy as np
import pandas as pd

from dataclasses import dataclass
from typing import Optional, Literal
import inspect

from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold, KFold
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer

from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
    accuracy_score, f1_score, roc_auc_score,
    mean_absolute_error, mean_squared_error, r2_score
)

import matplotlib.pyplot as plt

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

def rmse(y_true, y_pred) -> float:
    return float(np.sqrt(mean_squared_error(y_true, y_pred)))

# Версия-агностичный OneHotEncoder (чтобы не ловить ошибки из-за sparse/sparse_output)
def make_ohe_dense():
    sig = inspect.signature(OneHotEncoder)
    if "sparse_output" in sig.parameters:
        return OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    return OneHotEncoder(handle_unknown="ignore", sparse=False)

pd.set_option("display.max_columns", 80)


## Загрузка данных (по ссылке)

In [2]:

# ===== Banknote Authentication (classification) =====
banknote_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00267/data_banknote_authentication.txt"
banknote_cols = ["variance", "skewness", "curtosis", "entropy", "class"]
df_cls = pd.read_csv(banknote_url, header=None, names=banknote_cols)

# ===== Auto MPG (regression) =====
auto_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data"
auto_cols = ["mpg", "cylinders", "displacement", "horsepower", "weight", "acceleration", "model_year", "origin", "car_name"]
df_reg = pd.read_csv(
    auto_url,
    delim_whitespace=True,
    header=None,
    names=auto_cols,
    na_values="?"
)

display(df_cls.head())
display(df_reg.head())

print("Banknote shape:", df_cls.shape)
print("Auto MPG shape:", df_reg.shape)
print("\nMissing (Auto MPG):")
display(df_reg.isna().sum().to_frame("missing"))


  df_reg = pd.read_csv(


Unnamed: 0,variance,skewness,curtosis,entropy,class
0,3.6216,8.6661,-2.8073,-0.44699,0
1,4.5459,8.1674,-2.4586,-1.4621,0
2,3.866,-2.6383,1.9242,0.10645,0
3,3.4566,9.5228,-4.0112,-3.5944,0
4,0.32924,-4.4552,4.5718,-0.9888,0


Unnamed: 0,mpg,cylinders,displacement,horsepower,weight,acceleration,model_year,origin,car_name
0,18.0,8,307.0,130.0,3504.0,12.0,70,1,chevrolet chevelle malibu
1,15.0,8,350.0,165.0,3693.0,11.5,70,1,buick skylark 320
2,18.0,8,318.0,150.0,3436.0,11.0,70,1,plymouth satellite
3,16.0,8,304.0,150.0,3433.0,12.0,70,1,amc rebel sst
4,17.0,8,302.0,140.0,3449.0,10.5,70,1,ford torino


Banknote shape: (1372, 5)
Auto MPG shape: (398, 9)

Missing (Auto MPG):


Unnamed: 0,missing
mpg,0
cylinders,0
displacement,0
horsepower,6
weight,0
acceleration,0
model_year,0
origin,0
car_name,0



## 2. Создание бейзлайна и оценка качества (sklearn)

### 2.1 Разбиение train/test
- Классификация: `stratify` по классам.
- Регрессия: обычное разбиение.


In [3]:

# ===== Classification =====
X_cls = df_cls.drop(columns=["class"]).values
y_cls = df_cls["class"].values

X_cls_train, X_cls_test, y_cls_train, y_cls_test = train_test_split(
    X_cls, y_cls,
    test_size=0.2,
    random_state=RANDOM_STATE,
    stratify=y_cls
)

# ===== Regression =====
# car_name убираем (строковый признак)
df_reg_base = df_reg.drop(columns=["car_name"]).copy()
X_reg = df_reg_base.drop(columns=["mpg"])
y_reg = df_reg_base["mpg"]

X_reg_train, X_reg_test, y_reg_train, y_reg_test = train_test_split(
    X_reg, y_reg,
    test_size=0.2,
    random_state=RANDOM_STATE
)

print("cls train/test:", X_cls_train.shape, X_cls_test.shape)
print("reg train/test:", X_reg_train.shape, X_reg_test.shape)


cls train/test: (1097, 4) (275, 4)
reg train/test: (318, 7) (80, 7)



### 2.2 Бейзлайн: RandomForestClassifier и RandomForestRegressor

Особенности Random Forest:
- не требует масштабирования признаков;
- устойчивее к переобучению, чем одно дерево, потому что усредняет много деревьев;
- для регрессии всё равно нужно обработать пропуски (horsepower) и корректно обработать категориальный `origin`.


In [4]:

# ===== Baseline: Classification (RF) =====
rfc_base = RandomForestClassifier(
    random_state=RANDOM_STATE,
    n_estimators=200,
    n_jobs=-1
)
rfc_base.fit(X_cls_train, y_cls_train)

y_cls_pred = rfc_base.predict(X_cls_test)
y_cls_proba = rfc_base.predict_proba(X_cls_test)[:, 1]

cls_metrics_base = {
    "accuracy": accuracy_score(y_cls_test, y_cls_pred),
    "f1_macro": f1_score(y_cls_test, y_cls_pred, average="macro"),
    "roc_auc": roc_auc_score(y_cls_test, y_cls_proba),
}
print("Baseline (classification):", cls_metrics_base)

# ===== Baseline: Regression (RF + preprocess) =====
num_cols = ["cylinders", "displacement", "horsepower", "weight", "acceleration", "model_year"]
cat_cols = ["origin"]

reg_preprocess_base = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num_cols),
    ("cat", make_ohe_dense(), cat_cols),
], remainder="drop")

rfr_base = Pipeline([
    ("prep", reg_preprocess_base),
    ("model", RandomForestRegressor(
        random_state=RANDOM_STATE,
        n_estimators=300,
        n_jobs=-1
    ))
])

rfr_base.fit(X_reg_train, y_reg_train)
y_reg_pred = rfr_base.predict(X_reg_test)

reg_metrics_base = {
    "mae": mean_absolute_error(y_reg_test, y_reg_pred),
    "rmse": rmse(y_reg_test, y_reg_pred),
    "r2": r2_score(y_reg_test, y_reg_pred),
}
print("Baseline (regression):", reg_metrics_base)


Baseline (classification): {'accuracy': 0.9963636363636363, 'f1_macro': 0.9963198394111743, 'roc_auc': 1.0}
Baseline (regression): {'mae': 1.5788458333333335, 'rmse': 2.1573822792011415, 'r2': 0.9134348866320341}



## 3. Улучшение бейзлайна

### 3.1 Гипотезы улучшения

**Классификация (RandomForestClassifier):**
1. Подбор `max_depth`, `min_samples_leaf`, `min_samples_split` уменьшит переобучение и улучшит качество на тесте.
2. Подбор `max_features` влияет на разнообразие деревьев → может улучшить качество.
3. Подбор `n_estimators` и `bootstrap` влияет на устойчивость и дисперсию.

**Регрессия (RandomForestRegressor):**
1. Обработка `origin` через one-hot обязательна (категория, а не “число 1/2/3”).
2. Подбор `max_depth`, `min_samples_leaf`, `max_features`, `n_estimators` снизит RMSE/MAE.


In [5]:

# ===== Improved: Classification (GridSearchCV) =====
cls_param_grid = {
    "n_estimators": [200, 500],
    "max_depth": [None, 3, 5, 10],
    "min_samples_split": [2, 5, 10],
    "min_samples_leaf": [1, 2, 5],
    "max_features": ["sqrt", "log2", None],
    "bootstrap": [True, False],
}

cv_cls = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
cls_search = GridSearchCV(
    RandomForestClassifier(random_state=RANDOM_STATE, n_jobs=-1),
    cls_param_grid,
    cv=cv_cls,
    scoring="f1_macro",
    n_jobs=-1
)
cls_search.fit(X_cls_train, y_cls_train)

print("Best params (classification):", cls_search.best_params_)
print("CV best f1_macro:", cls_search.best_score_)

rfc_best = cls_search.best_estimator_
y_cls_pred_best = rfc_best.predict(X_cls_test)
y_cls_proba_best = rfc_best.predict_proba(X_cls_test)[:, 1]

cls_metrics_best = {
    "accuracy": accuracy_score(y_cls_test, y_cls_pred_best),
    "f1_macro": f1_score(y_cls_test, y_cls_pred_best, average="macro"),
    "roc_auc": roc_auc_score(y_cls_test, y_cls_proba_best),
}
print("Improved (classification):", cls_metrics_best)


# ===== Improved: Regression (preprocess + GridSearchCV) =====
reg_preprocess = ColumnTransformer([
    ("num", SimpleImputer(strategy="median"), num_cols),
    ("cat", make_ohe_dense(), cat_cols),
], remainder="drop")

reg_pipe = Pipeline([
    ("prep", reg_preprocess),
    ("model", RandomForestRegressor(random_state=RANDOM_STATE, n_jobs=-1))
])

reg_param_grid = {
    "model__n_estimators": [300, 700],
    "model__max_depth": [None, 5, 10, 20],
    "model__min_samples_split": [2, 5, 10],
    "model__min_samples_leaf": [1, 2, 5],
    "model__max_features": ["sqrt", "log2", 1.0],
    "model__bootstrap": [True, False],
}

cv_reg = KFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
reg_search = GridSearchCV(
    reg_pipe,
    reg_param_grid,
    cv=cv_reg,
    scoring="neg_root_mean_squared_error",
    n_jobs=-1
)
reg_search.fit(X_reg_train, y_reg_train)

print("Best params (regression):", reg_search.best_params_)
print("CV best (neg RMSE):", reg_search.best_score_)

rfr_best = reg_search.best_estimator_
y_reg_pred_best = rfr_best.predict(X_reg_test)

reg_metrics_best = {
    "mae": mean_absolute_error(y_reg_test, y_reg_pred_best),
    "rmse": rmse(y_reg_test, y_reg_pred_best),
    "r2": r2_score(y_reg_test, y_reg_pred_best),
}
print("Improved (regression):", reg_metrics_best)


compare = pd.DataFrame([
    {"task": "classification", "stage": "baseline", **cls_metrics_base},
    {"task": "classification", "stage": "improved", **cls_metrics_best},
    {"task": "regression", "stage": "baseline", **reg_metrics_base},
    {"task": "regression", "stage": "improved", **reg_metrics_best},
])
display(compare)


Best params (classification): {'bootstrap': False, 'max_depth': None, 'max_features': 'sqrt', 'min_samples_leaf': 1, 'min_samples_split': 2, 'n_estimators': 500}
CV best f1_macro: 0.995390423269211
Improved (classification): {'accuracy': 0.9963636363636363, 'f1_macro': 0.9963198394111743, 'roc_auc': 1.0}
Best params (regression): {'model__bootstrap': True, 'model__max_depth': 10, 'model__max_features': 'log2', 'model__min_samples_leaf': 1, 'model__min_samples_split': 2, 'model__n_estimators': 700}
CV best (neg RMSE): -2.9013398524118506
Improved (regression): {'mae': 1.5744949690451264, 'rmse': 2.1357032800011164, 'r2': 0.915165888199412}


Unnamed: 0,task,stage,accuracy,f1_macro,roc_auc,mae,rmse,r2
0,classification,baseline,0.996364,0.99632,1.0,,,
1,classification,improved,0.996364,0.99632,1.0,,,
2,regression,baseline,,,,1.578846,2.157382,0.913435
3,regression,improved,,,,1.574495,2.135703,0.915166



## 4. Имплементация Random Forest (с нуля)

Сделаем упрощённую реализацию Random Forest:
- строим много деревьев на bootstrap-выборках;
- в каждом узле выбираем случайное подмножество признаков (`max_features`) и ищем лучший сплит только по ним;
- **классификация:** голосование большинством / усреднение вероятностей;
- **регрессия:** усреднение предсказаний.

> Реализация написана на NumPy и Python для учебных целей (будет медленнее sklearn).


In [6]:

def _gini(counts: np.ndarray) -> float:
    total = counts.sum()
    if total <= 0:
        return 0.0
    p = counts / total
    return float(1.0 - np.sum(p ** 2))

def _entropy(counts: np.ndarray) -> float:
    total = counts.sum()
    if total <= 0:
        return 0.0
    p = counts / total
    p = p[p > 0]
    return float(-np.sum(p * np.log2(p)))

def _mse_from_sums(sum_y: float, sum_y2: float, n: int) -> float:
    if n <= 0:
        return 0.0
    mean = sum_y / n
    return float(sum_y2 / n - mean * mean)

@dataclass
class _NodeC:
    feature: Optional[int] = None
    threshold: Optional[float] = None
    left: Optional["__class__"] = None
    right: Optional["__class__"] = None
    # leaf:
    class_counts: Optional[np.ndarray] = None  # (n_classes,)

@dataclass
class _NodeR:
    feature: Optional[int] = None
    threshold: Optional[float] = None
    left: Optional["__class__"] = None
    right: Optional["__class__"] = None
    value: Optional[float] = None  # mean in leaf


class DecisionTreeClassifierRF:
    def __init__(self,
                 max_depth: Optional[int] = None,
                 min_samples_split: int = 2,
                 min_samples_leaf: int = 1,
                 criterion: Literal["gini", "entropy"] = "gini",
                 max_features: Optional[int] = None,
                 random_state: Optional[int] = None):
        self.max_depth = max_depth
        self.min_samples_split = int(min_samples_split)
        self.min_samples_leaf = int(min_samples_leaf)
        self.criterion = criterion
        self.max_features = max_features
        self.rng_ = np.random.default_rng(random_state)
        self.root_ = None
        self.classes_ = None
        self.n_features_ = None
        self.n_classes_ = None

    def fit(self, X, y):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y, dtype=int)
        self.n_features_ = X.shape[1]
        self.classes_, y_enc = np.unique(y, return_inverse=True)
        self.n_classes_ = len(self.classes_)
        self.root_ = self._build(X, y_enc, depth=0)
        return self

    def _impurity(self, counts):
        return _gini(counts) if self.criterion == "gini" else _entropy(counts)

    def _feature_subset(self):
        if self.max_features is None or self.max_features >= self.n_features_:
            return np.arange(self.n_features_)
        return self.rng_.choice(self.n_features_, size=self.max_features, replace=False)

    def _best_split(self, X, y):
        n, d = X.shape
        total_counts = np.bincount(y, minlength=self.n_classes_).astype(float)

        best_imp = float("inf")
        best_f = None
        best_thr = None

        feat_idx = self._feature_subset()

        for f in feat_idx:
            xs = X[:, f]
            order = np.argsort(xs, kind="mergesort")
            xs_sorted = xs[order]
            y_sorted = y[order]

            diffs = xs_sorted[1:] != xs_sorted[:-1]
            if not np.any(diffs):
                continue

            left_counts = np.zeros((n - 1, self.n_classes_), dtype=float)
            running = np.zeros(self.n_classes_, dtype=float)
            for i in range(n - 1):
                running[y_sorted[i]] += 1.0
                left_counts[i] = running

            right_counts = total_counts - left_counts
            left_n = np.arange(1, n)
            right_n = n - left_n

            valid = diffs & (left_n >= self.min_samples_leaf) & (right_n >= self.min_samples_leaf)
            if not np.any(valid):
                continue

            imp_left = np.array([self._impurity(c) for c in left_counts])
            imp_right = np.array([self._impurity(c) for c in right_counts])
            weighted = (left_n * imp_left + right_n * imp_right) / n
            weighted[~valid] = np.inf

            i_best = int(np.argmin(weighted))
            if weighted[i_best] < best_imp:
                best_imp = float(weighted[i_best])
                best_f = int(f)
                best_thr = float((xs_sorted[i_best] + xs_sorted[i_best + 1]) / 2.0)

        return best_f, best_thr

    def _make_leaf(self, y):
        counts = np.bincount(y, minlength=self.n_classes_).astype(float)
        return _NodeC(class_counts=counts)

    def _build(self, X, y, depth):
        n = X.shape[0]

        if (self.max_depth is not None and depth >= self.max_depth) or            (n < self.min_samples_split) or            (len(np.unique(y)) == 1):
            return self._make_leaf(y)

        f, thr = self._best_split(X, y)
        if f is None:
            return self._make_leaf(y)

        left_mask = X[:, f] <= thr
        right_mask = ~left_mask

        if left_mask.sum() < self.min_samples_leaf or right_mask.sum() < self.min_samples_leaf:
            return self._make_leaf(y)

        node = _NodeC(feature=f, threshold=thr)
        node.left = self._build(X[left_mask], y[left_mask], depth + 1)
        node.right = self._build(X[right_mask], y[right_mask], depth + 1)
        return node

    def _leaf_counts(self, x, node: _NodeC):
        while node.feature is not None:
            node = node.left if x[node.feature] <= node.threshold else node.right
        return node.class_counts

    def predict_proba(self, X):
        X = np.asarray(X, dtype=float)
        probs = []
        for x in X:
            counts = self._leaf_counts(x, self.root_)
            p = counts / (counts.sum() + 1e-12)
            probs.append(p)
        probs = np.vstack(probs)
        # to original class order (already 0..k-1)
        return probs

    def predict(self, X):
        proba = self.predict_proba(X)
        pred_enc = np.argmax(proba, axis=1)
        return self.classes_[pred_enc]


class DecisionTreeRegressorRF:
    def __init__(self,
                 max_depth: Optional[int] = None,
                 min_samples_split: int = 2,
                 min_samples_leaf: int = 1,
                 max_features: Optional[int] = None,
                 random_state: Optional[int] = None):
        self.max_depth = max_depth
        self.min_samples_split = int(min_samples_split)
        self.min_samples_leaf = int(min_samples_leaf)
        self.max_features = max_features
        self.rng_ = np.random.default_rng(random_state)
        self.root_ = None
        self.n_features_ = None

    def fit(self, X, y):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y, dtype=float)
        self.n_features_ = X.shape[1]
        self.root_ = self._build(X, y, depth=0)
        return self

    def _feature_subset(self):
        if self.max_features is None or self.max_features >= self.n_features_:
            return np.arange(self.n_features_)
        return self.rng_.choice(self.n_features_, size=self.max_features, replace=False)

    def _best_split(self, X, y):
        n, d = X.shape
        best_loss = float("inf")
        best_f = None
        best_thr = None

        total_sum = float(np.sum(y))
        total_sum2 = float(np.sum(y ** 2))

        feat_idx = self._feature_subset()

        for f in feat_idx:
            xs = X[:, f]
            order = np.argsort(xs, kind="mergesort")
            xs_sorted = xs[order]
            y_sorted = y[order]

            diffs = xs_sorted[1:] != xs_sorted[:-1]
            if not np.any(diffs):
                continue

            prefix_sum = np.cumsum(y_sorted[:-1])
            prefix_sum2 = np.cumsum((y_sorted[:-1]) ** 2)

            left_n = np.arange(1, n)
            right_n = n - left_n

            right_sum = total_sum - prefix_sum
            right_sum2 = total_sum2 - prefix_sum2

            valid = diffs & (left_n >= self.min_samples_leaf) & (right_n >= self.min_samples_leaf)
            if not np.any(valid):
                continue

            left_mse = np.array([_mse_from_sums(prefix_sum[i], prefix_sum2[i], int(left_n[i])) for i in range(n - 1)])
            right_mse = np.array([_mse_from_sums(right_sum[i], right_sum2[i], int(right_n[i])) for i in range(n - 1)])
            weighted = (left_n * left_mse + right_n * right_mse) / n
            weighted[~valid] = np.inf

            i_best = int(np.argmin(weighted))
            if weighted[i_best] < best_loss:
                best_loss = float(weighted[i_best])
                best_f = int(f)
                best_thr = float((xs_sorted[i_best] + xs_sorted[i_best + 1]) / 2.0)

        return best_f, best_thr

    def _leaf_value(self, y):
        return float(np.mean(y)) if y.size else 0.0

    def _build(self, X, y, depth):
        n = X.shape[0]
        if (self.max_depth is not None and depth >= self.max_depth) or            (n < self.min_samples_split) or            (n <= 2 * self.min_samples_leaf):
            return _NodeR(value=self._leaf_value(y))

        f, thr = self._best_split(X, y)
        if f is None:
            return _NodeR(value=self._leaf_value(y))

        left_mask = X[:, f] <= thr
        right_mask = ~left_mask

        if left_mask.sum() < self.min_samples_leaf or right_mask.sum() < self.min_samples_leaf:
            return _NodeR(value=self._leaf_value(y))

        node = _NodeR(feature=f, threshold=thr)
        node.left = self._build(X[left_mask], y[left_mask], depth + 1)
        node.right = self._build(X[right_mask], y[right_mask], depth + 1)
        return node

    def _predict_one(self, x, node: _NodeR):
        while node.feature is not None:
            node = node.left if x[node.feature] <= node.threshold else node.right
        return node.value

    def predict(self, X):
        X = np.asarray(X, dtype=float)
        return np.array([self._predict_one(x, self.root_) for x in X], dtype=float)


class RandomForestClassifierCustom:
    def __init__(self,
                 n_estimators: int = 100,
                 max_depth: Optional[int] = None,
                 min_samples_split: int = 2,
                 min_samples_leaf: int = 1,
                 criterion: Literal["gini", "entropy"] = "gini",
                 max_features: Optional[Literal["sqrt","log2", int]] = "sqrt",
                 bootstrap: bool = True,
                 random_state: Optional[int] = None):
        self.n_estimators = int(n_estimators)
        self.max_depth = max_depth
        self.min_samples_split = int(min_samples_split)
        self.min_samples_leaf = int(min_samples_leaf)
        self.criterion = criterion
        self.max_features = max_features
        self.bootstrap = bool(bootstrap)
        self.rng_ = np.random.default_rng(random_state)
        self.trees_ = []
        self.classes_ = None

    def _resolve_max_features(self, n_features: int) -> int:
        mf = self.max_features
        if mf is None:
            return n_features
        if mf == "sqrt":
            return max(1, int(np.sqrt(n_features)))
        if mf == "log2":
            return max(1, int(np.log2(n_features)))
        if isinstance(mf, int):
            return max(1, min(n_features, mf))
        raise ValueError("Unsupported max_features")

    def fit(self, X, y):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y, dtype=int)
        self.classes_ = np.unique(y)

        n, d = X.shape
        max_feat_n = self._resolve_max_features(d)
        self.trees_ = []

        for i in range(self.n_estimators):
            if self.bootstrap:
                idx = self.rng_.integers(0, n, size=n)
            else:
                idx = np.arange(n)

            tree = DecisionTreeClassifierRF(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                criterion=self.criterion,
                max_features=max_feat_n,
                random_state=int(self.rng_.integers(0, 1_000_000_000))
            )
            tree.fit(X[idx], y[idx])
            self.trees_.append(tree)

        return self

    def predict_proba(self, X):
        X = np.asarray(X, dtype=float)
        probs = None
        for t in self.trees_:
            p = t.predict_proba(X)  # (n, n_classes)
            probs = p if probs is None else (probs + p)
        probs = probs / max(1, len(self.trees_))
        return probs

    def predict(self, X):
        proba = self.predict_proba(X)
        pred_enc = np.argmax(proba, axis=1)
        # tree classes are encoded from 0..k-1, and original dataset is 0/1
        return pred_enc.astype(int)


class RandomForestRegressorCustom:
    def __init__(self,
                 n_estimators: int = 100,
                 max_depth: Optional[int] = None,
                 min_samples_split: int = 2,
                 min_samples_leaf: int = 1,
                 max_features: Optional[Literal["sqrt","log2", int]] = "sqrt",
                 bootstrap: bool = True,
                 random_state: Optional[int] = None):
        self.n_estimators = int(n_estimators)
        self.max_depth = max_depth
        self.min_samples_split = int(min_samples_split)
        self.min_samples_leaf = int(min_samples_leaf)
        self.max_features = max_features
        self.bootstrap = bool(bootstrap)
        self.rng_ = np.random.default_rng(random_state)
        self.trees_ = []

    def _resolve_max_features(self, n_features: int) -> int:
        mf = self.max_features
        if mf is None:
            return n_features
        if mf == "sqrt":
            return max(1, int(np.sqrt(n_features)))
        if mf == "log2":
            return max(1, int(np.log2(n_features)))
        if isinstance(mf, int):
            return max(1, min(n_features, mf))
        raise ValueError("Unsupported max_features")

    def fit(self, X, y):
        X = np.asarray(X, dtype=float)
        y = np.asarray(y, dtype=float)
        n, d = X.shape
        max_feat_n = self._resolve_max_features(d)
        self.trees_ = []

        for i in range(self.n_estimators):
            if self.bootstrap:
                idx = self.rng_.integers(0, n, size=n)
            else:
                idx = np.arange(n)

            tree = DecisionTreeRegressorRF(
                max_depth=self.max_depth,
                min_samples_split=self.min_samples_split,
                min_samples_leaf=self.min_samples_leaf,
                max_features=max_feat_n,
                random_state=int(self.rng_.integers(0, 1_000_000_000))
            )
            tree.fit(X[idx], y[idx])
            self.trees_.append(tree)

        return self

    def predict(self, X):
        X = np.asarray(X, dtype=float)
        preds = np.zeros(X.shape[0], dtype=float)
        for t in self.trees_:
            preds += t.predict(X)
        preds /= max(1, len(self.trees_))
        return preds



### 4.1 Кастомные модели vs бейзлайн (пункт 2)

Для честного сравнения:
- **Классификация:** используем те же исходные признаки (как в sklearn-бейзлайне).
- **Регрессия:** используем тот же препроцессинг (импутация + one-hot origin), потому что в данных есть пропуски и категориальный признак.


In [7]:

# ===== Custom baseline: classification =====
# (для скорости оставим меньше деревьев, чем в sklearn)
custom_rfc = RandomForestClassifierCustom(
    n_estimators=60,
    max_depth=None,
    min_samples_split=2,
    min_samples_leaf=1,
    criterion="gini",
    max_features="sqrt",
    bootstrap=True,
    random_state=RANDOM_STATE
).fit(X_cls_train, y_cls_train)

y_cls_pred_c = custom_rfc.predict(X_cls_test)
y_cls_proba_c = custom_rfc.predict_proba(X_cls_test)[:, 1]

cls_metrics_custom_base = {
    "accuracy": accuracy_score(y_cls_test, y_cls_pred_c),
    "f1_macro": f1_score(y_cls_test, y_cls_pred_c, average="macro"),
    "roc_auc": roc_auc_score(y_cls_test, y_cls_proba_c),
}
print("Custom baseline (classification):", cls_metrics_custom_base)

# ===== Custom baseline: regression (same preprocess as baseline) =====
X_reg_train_p = reg_preprocess_base.fit_transform(X_reg_train)
X_reg_test_p = reg_preprocess_base.transform(X_reg_test)

custom_rfr = RandomForestRegressorCustom(
    n_estimators=80,
    max_depth=None,
    min_samples_split=2,
    min_samples_leaf=1,
    max_features="sqrt",
    bootstrap=True,
    random_state=RANDOM_STATE
).fit(X_reg_train_p, y_reg_train.values)

y_reg_pred_c = custom_rfr.predict(X_reg_test_p)

reg_metrics_custom_base = {
    "mae": mean_absolute_error(y_reg_test, y_reg_pred_c),
    "rmse": rmse(y_reg_test, y_reg_pred_c),
    "r2": r2_score(y_reg_test, y_reg_pred_c),
}
print("Custom baseline (regression):", reg_metrics_custom_base)

display(pd.DataFrame([
    {"task": "classification", "model": "sklearn_baseline", **cls_metrics_base},
    {"task": "classification", "model": "custom_baseline", **cls_metrics_custom_base},
    {"task": "regression", "model": "sklearn_baseline", **reg_metrics_base},
    {"task": "regression", "model": "custom_baseline", **reg_metrics_custom_base},
]))


Custom baseline (classification): {'accuracy': 0.9963636363636363, 'f1_macro': 0.9963198394111743, 'roc_auc': 1.0}
Custom baseline (regression): {'mae': 1.6286431500652658, 'rmse': 2.124097960371601, 'r2': 0.9160853530459732}


Unnamed: 0,task,model,accuracy,f1_macro,roc_auc,mae,rmse,r2
0,classification,sklearn_baseline,0.996364,0.99632,1.0,,,
1,classification,custom_baseline,0.996364,0.99632,1.0,,,
2,regression,sklearn_baseline,,,,1.578846,2.157382,0.913435
3,regression,custom_baseline,,,,1.628643,2.124098,0.916085



### 4.2 Добавляем техники улучшенного бейзлайна (пункт 3) к кастомным моделям

Берём лучшие гиперпараметры из GridSearchCV и применяем их к кастомной реализации.

> Важно: в sklearn GridSearch мог выбрать очень большое `n_estimators` (например 700).  
> В кастомной реализации для скорости ограничим `n_estimators` сверху (например 120), сохранив остальные параметры.


In [8]:

# ===== Custom improved: classification (best params) =====
bp = cls_search.best_params_.copy()

best_n_estimators_cls = int(bp.get("n_estimators", 200))
best_n_estimators_cls = min(best_n_estimators_cls, 120)

best_max_depth_cls = bp.get("max_depth", None)
best_min_split_cls = int(bp.get("min_samples_split", 2))
best_min_leaf_cls = int(bp.get("min_samples_leaf", 1))
best_max_features_cls = bp.get("max_features", "sqrt")
best_bootstrap_cls = bool(bp.get("bootstrap", True))

custom_rfc_best = RandomForestClassifierCustom(
    n_estimators=best_n_estimators_cls,
    max_depth=best_max_depth_cls,
    min_samples_split=best_min_split_cls,
    min_samples_leaf=best_min_leaf_cls,
    criterion="gini",  # критерий в кастом-дереве задан gini/entropy; оставим gini (или можно расширить)
    max_features=("sqrt" if best_max_features_cls == "sqrt" else ("log2" if best_max_features_cls == "log2" else None)),
    bootstrap=best_bootstrap_cls,
    random_state=RANDOM_STATE
).fit(X_cls_train, y_cls_train)

y_cls_pred_cb = custom_rfc_best.predict(X_cls_test)
y_cls_proba_cb = custom_rfc_best.predict_proba(X_cls_test)[:, 1]

cls_metrics_custom_improved = {
    "accuracy": accuracy_score(y_cls_test, y_cls_pred_cb),
    "f1_macro": f1_score(y_cls_test, y_cls_pred_cb, average="macro"),
    "roc_auc": roc_auc_score(y_cls_test, y_cls_proba_cb),
}
print("Custom improved (classification):", cls_metrics_custom_improved)


# ===== Custom improved: regression (best params + same preprocess) =====
bp_r = reg_search.best_params_.copy()
best_n_estimators_reg = int(bp_r.get("model__n_estimators", 300))
best_n_estimators_reg = min(best_n_estimators_reg, 160)

best_max_depth_reg = bp_r.get("model__max_depth", None)
best_min_split_reg = int(bp_r.get("model__min_samples_split", 2))
best_min_leaf_reg = int(bp_r.get("model__min_samples_leaf", 1))
best_max_features_reg = bp_r.get("model__max_features", "sqrt")
best_bootstrap_reg = bool(bp_r.get("model__bootstrap", True))

# те же преобразования, что и в improved пайплайне
X_reg_train_pp = reg_preprocess.fit_transform(X_reg_train)
X_reg_test_pp = reg_preprocess.transform(X_reg_test)

custom_rfr_best = RandomForestRegressorCustom(
    n_estimators=best_n_estimators_reg,
    max_depth=best_max_depth_reg,
    min_samples_split=best_min_split_reg,
    min_samples_leaf=best_min_leaf_reg,
    max_features=("sqrt" if best_max_features_reg == "sqrt" else ("log2" if best_max_features_reg == "log2" else None)),
    bootstrap=best_bootstrap_reg,
    random_state=RANDOM_STATE
).fit(X_reg_train_pp, y_reg_train.values)

y_reg_pred_cb = custom_rfr_best.predict(X_reg_test_pp)

reg_metrics_custom_improved = {
    "mae": mean_absolute_error(y_reg_test, y_reg_pred_cb),
    "rmse": rmse(y_reg_test, y_reg_pred_cb),
    "r2": r2_score(y_reg_test, y_reg_pred_cb),
}
print("Custom improved (regression):", reg_metrics_custom_improved)


summary = pd.DataFrame([
    {"task": "classification", "stage": "sklearn_baseline", **cls_metrics_base},
    {"task": "classification", "stage": "sklearn_improved", **cls_metrics_best},
    {"task": "classification", "stage": "custom_baseline", **cls_metrics_custom_base},
    {"task": "classification", "stage": "custom_improved", **cls_metrics_custom_improved},

    {"task": "regression", "stage": "sklearn_baseline", **reg_metrics_base},
    {"task": "regression", "stage": "sklearn_improved", **reg_metrics_best},
    {"task": "regression", "stage": "custom_baseline", **reg_metrics_custom_base},
    {"task": "regression", "stage": "custom_improved", **reg_metrics_custom_improved},
])
display(summary)


Custom improved (classification): {'accuracy': 0.9963636363636363, 'f1_macro': 0.9963198394111743, 'roc_auc': 1.0}
Custom improved (regression): {'mae': 1.5367794713790723, 'rmse': 2.0979930929170973, 'r2': 0.9181352771183077}


Unnamed: 0,task,stage,accuracy,f1_macro,roc_auc,mae,rmse,r2
0,classification,sklearn_baseline,0.996364,0.99632,1.0,,,
1,classification,sklearn_improved,0.996364,0.99632,1.0,,,
2,classification,custom_baseline,0.996364,0.99632,1.0,,,
3,classification,custom_improved,0.996364,0.99632,1.0,,,
4,regression,sklearn_baseline,,,,1.578846,2.157382,0.913435
5,regression,sklearn_improved,,,,1.574495,2.135703,0.915166
6,regression,custom_baseline,,,,1.628643,2.124098,0.916085
7,regression,custom_improved,,,,1.536779,2.097993,0.918135



## Выводы (кратко по пунктам)

**Классификация:**
1. Random Forest обычно устойчивее одного дерева, потому что усредняет множество независимых деревьев.
2. Подбор `max_depth/min_samples_leaf/min_samples_split` уменьшает переобучение и улучшает качество на тесте.
3. `max_features` влияет на разнообразие деревьев - часто даёт прирост при `sqrt/log2`.
4. Кастомная реализация (bagging + случайные признаки в узлах) воспроизводит основной принцип Random Forest и даёт сопоставимые метрики (но может быть медленнее и отличаться из-за деталей реализации).

**Регрессия:**
1. Импутация `horsepower` обязательна из-за пропусков.
2. One-hot для `origin` улучшает качество, потому что `origin` - категориальный признак.
3. Подбор гиперпараметров леса даёт снижение RMSE/MAE и рост R².
4. Кастомный лес на тех же преобразованных признаках даёт близкие результаты, но обычно проигрывает по скорости и иногда по качеству из-за более простых эвристик.
