Импорты

In [12]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, mean_absolute_error, mean_squared_error, r2_score, recall_score

Загрузка датасета для классификации

In [2]:
df_heart = pd.read_csv('heart.csv')
X_cls = df_heart.drop('target', axis=1)
y_cls = df_heart['target']

Разделение на тестовые и тренировочные

In [3]:
X_train_cls, X_test_cls, y_train_cls, y_test_cls = train_test_split(
    X_cls, y_cls, test_size=0.2, random_state=42, stratify=y_cls
)

Загрузка датасета для регрессии

In [4]:
df_energy = pd.read_csv('energy_efficiency_data.csv')
X_reg = df_energy.drop(['Heating_Load', 'Cooling_Load'], axis=1)
y_reg = df_energy['Heating_Load']

Определение признаков

In [5]:
num_features = ['Relative_Compactness', 'Surface_Area', 'Wall_Area', 'Roof_Area', 'Overall_Height']
cat_features = ['Orientation', 'Glazing_Area', 'Glazing_Area_Distribution']

Разделение на тестовые и тренировочные

In [6]:
X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
    X_reg, y_reg, test_size=0.2, random_state=42
)

Дерево классификатор

In [10]:
class MyDecisionTreeClassifier:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None

    def _gini(self, y):
        _, counts = np.unique(y, return_counts=True)
        probs = counts / len(y)
        return 1.0 - np.sum(probs ** 2)

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        best_gain = -1
        best_feature, best_threshold = None, None
        parent_gini = self._gini(y)

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for t in thresholds:
                left_mask = X[:, feature] <= t
                right_mask = ~left_mask

                if np.sum(left_mask) < self.min_samples_split or np.sum(right_mask) < self.min_samples_split:
                    continue

                gini_left = self._gini(y[left_mask])
                gini_right = self._gini(y[right_mask])
                weighted_gini = (np.sum(left_mask) * gini_left + np.sum(right_mask) * gini_right) / n_samples
                gain = parent_gini - weighted_gini

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = t

        return best_feature, best_threshold

    def _grow_tree(self, X, y, depth):
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.bincount(y).argmax()

        feature, threshold = self._best_split(X, y)
        if feature is None:
            return np.bincount(y).argmax()

        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask

        left_subtree = self._grow_tree(X[left_mask], y[left_mask], depth + 1)
        right_subtree = self._grow_tree(X[right_mask], y[right_mask], depth + 1)

        return {
            'feature': feature,
            'threshold': threshold,
            'left': left_subtree,
            'right': right_subtree
        }

    def fit(self, X, y):
        X, y = np.array(X), np.array(y)
        self.tree_ = self._grow_tree(X, y, depth=0)
        return self

    def _predict_single(self, x, node):
        if not isinstance(node, dict):
            return node
        if x[node['feature']] <= node['threshold']:
            return self._predict_single(x, node['left'])
        else:
            return self._predict_single(x, node['right'])

    def predict(self, X):
        return np.array([self._predict_single(x, self.tree_) for x in np.array(X)])

Дерево регрессор

In [11]:
class MyDecisionTreeRegressor:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.tree_ = None

    def _mse(self, y):
        return np.mean((y - np.mean(y)) ** 2)

    def _best_split(self, X, y):
        n_samples, n_features = X.shape
        best_mse = np.inf
        best_feature, best_threshold = None, None

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for t in thresholds:
                left_mask = X[:, feature] <= t
                right_mask = ~left_mask

                if np.sum(left_mask) < self.min_samples_split or np.sum(right_mask) < self.min_samples_split:
                    continue

                mse_left = self._mse(y[left_mask])
                mse_right = self._mse(y[right_mask])
                weighted_mse = (np.sum(left_mask) * mse_left + np.sum(right_mask) * mse_right) / n_samples

                if weighted_mse < best_mse:
                    best_mse = weighted_mse
                    best_feature = feature
                    best_threshold = t

        return best_feature, best_threshold

    def _grow_tree(self, X, y, depth):
        if (self.max_depth is not None and depth >= self.max_depth) or len(y) < self.min_samples_split:
            return np.mean(y)

        feature, threshold = self._best_split(X, y)
        if feature is None:
            return np.mean(y)

        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask

        left_subtree = self._grow_tree(X[left_mask], y[left_mask], depth + 1)
        right_subtree = self._grow_tree(X[right_mask], y[right_mask], depth + 1)

        return {
            'feature': feature,
            'threshold': threshold,
            'left': left_subtree,
            'right': right_subtree
        }

    def fit(self, X, y):
        X, y = np.array(X), np.array(y)
        self.tree_ = self._grow_tree(X, y, depth=0)
        return self

    def _predict_single(self, x, node):
        if not isinstance(node, dict):
            return node
        if x[node['feature']] <= node['threshold']:
            return self._predict_single(x, node['left'])
        else:
            return self._predict_single(x, node['right'])

    def predict(self, X):
        return np.array([self._predict_single(x, self.tree_) for x in np.array(X)])

Классификатор

In [None]:
class MyGradientBoostingClassifier:
    def __init__(self, n_estimators=10, learning_rate=0.1, max_depth=3):
        self.n_estimators = n_estimators
        self.lr = learning_rate
        self.max_depth = max_depth
        self.models = []
        self.base_logit = None
        self.pos_label = None

    def _sigmoid(self, z):
        z = np.clip(z, -100, 100)
        return 1.0 / (1.0 + np.exp(-z))

    def fit(self, X, y):
        X, y = np.asarray(X, dtype=np.float64), np.asarray(y)
        self.pos_label = 1
        y_bin = (y == self.pos_label).astype(np.float64)

        p = np.clip(np.mean(y_bin), 1e-8, 1 - 1e-8)
        self.base_logit = np.log(p / (1 - p))
        logits = np.full_like(y_bin, self.base_logit)

        for _ in range(self.n_estimators):
            probas = self._sigmoid(logits)
            pseudo_residuals = y_bin - probas
            tree = MyDecisionTreeRegressor(max_depth=self.max_depth)
            tree.fit(X, pseudo_residuals)
            self.models.append(tree)
            logits += self.lr * tree.predict(X)
        return self

    def predict(self, X):
        X = np.asarray(X, dtype=np.float64)
        logits = np.full(X.shape[0], self.base_logit)
        for tree in self.models:
            logits += self.lr * tree.predict(X)
        probas = self._sigmoid(logits)
        return np.where(probas >= 0.5, self.pos_label, 0)

Регрессор

In [14]:
class MyGradientBoostingRegressor:
    def __init__(self, n_estimators=10, learning_rate=0.1, max_depth=3):
        self.n_estimators = n_estimators
        self.lr = learning_rate
        self.max_depth = max_depth
        self.models = []
        self.base_pred = None

    def fit(self, X, y):
        X, y = np.asarray(X, dtype=np.float64), np.asarray(y, dtype=np.float64)
        self.base_pred = np.mean(y)
        current_pred = np.full_like(y, self.base_pred)

        for _ in range(self.n_estimators):
            errors = y - current_pred
            tree = MyDecisionTreeRegressor(max_depth=self.max_depth)
            tree.fit(X, errors)
            self.models.append(tree)
            current_pred += self.lr * tree.predict(X)
        return self

    def predict(self, X):
        X = np.asarray(X, dtype=np.float64)
        pred = np.full(X.shape[0], self.base_pred)
        for tree in self.models:
            pred += self.lr * tree.predict(X)
        return pred

Метрики для классификации

In [15]:
def evaluate_classification(y_true, y_pred):
    return {
        'Accuracy': accuracy_score(y_true, y_pred),
        'F1': f1_score(y_true, y_pred),
        'ROC-AUC': roc_auc_score(y_true, y_pred),
    }

Метрики для регрессии

In [16]:
def evaluate_regression(y_true, y_pred):
    return {
        'MAE': mean_absolute_error(y_true, y_pred),
        'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)),
        'R2': r2_score(y_true, y_pred)
    }

Запуск базовой модели skrean (классификация)

In [17]:
sk_gb_cls_base = GradientBoostingClassifier(n_estimators=10, max_depth=3, learning_rate=0.1, random_state=42)
sk_gb_cls_base.fit(X_train_cls, y_train_cls)
y_pred_sk_cls = sk_gb_cls_base.predict(X_test_cls)

Запуск кастомной базовой модели (классификация)

In [18]:
my_gb_cls_base = MyGradientBoostingClassifier(n_estimators=10, max_depth=3, learning_rate=0.1)
my_gb_cls_base.fit(X_train_cls.values, y_train_cls.values)
y_pred_my_cls = my_gb_cls_base.predict(X_test_cls.values)

Вывод метрик классификации

In [19]:
print("Sklearn:", evaluate_classification(y_test_cls, y_pred_sk_cls))
print("Custom :", evaluate_classification(y_test_cls, y_pred_my_cls))

Sklearn: {'Accuracy': 0.8439024390243902, 'F1': 0.8558558558558559, 'ROC-AUC': 0.8423809523809525}
Custom : {'Accuracy': 0.8536585365853658, 'F1': 0.8611111111111112, 'ROC-AUC': 0.8528571428571429}


Запуск базовой модели sklearn (регрессия)

In [20]:
sk_gb_reg_base = GradientBoostingRegressor(n_estimators=10, max_depth=3, learning_rate=0.05, random_state=42)
sk_gb_reg_base.fit(X_train_reg, y_train_reg)
y_pred_sk_reg = sk_gb_reg_base.predict(X_test_reg)

Запуск кастомной базовой модели (регрессия)

In [21]:
my_gb_reg_base = MyGradientBoostingRegressor(n_estimators=10, max_depth=3, learning_rate=0.05)
my_gb_reg_base.fit(X_train_reg, y_train_reg)
y_pred_my_reg = my_gb_reg_base.predict(X_test_reg)

Вывод метрик регрессии

In [22]:
print("Sklearn:", evaluate_regression(y_test_reg, y_pred_sk_reg))
print("Custom :", evaluate_regression(y_test_reg, y_pred_my_reg))

Sklearn: {'MAE': 5.756919094057019, 'RMSE': np.float64(6.498014749562232), 'R2': 0.5949000491252425}
Custom : {'MAE': 5.756919094057019, 'RMSE': np.float64(6.498014749562232), 'R2': 0.5949000491252425}


Применим улучшения

Подбор гиперпараметров

In [24]:
param_grid_cls = {
    'n_estimators': [50, 100],
    'max_depth': [3, 4],
    'learning_rate': [0.01, 0.05, 0.1]
}

grid_cls = GridSearchCV(
    GradientBoostingClassifier(random_state=42),
    param_grid_cls,
    cv=5,
    scoring='f1'
)
grid_cls.fit(X_train_cls, y_train_cls)

best_params = grid_cls.best_params_

Запуск улучшенной модели sklearn (классификация)

In [25]:
best_sk_cls = grid_cls.best_estimator_
y_pred_sk_cls_opt = best_sk_cls.predict(X_test_cls)

Запуск кастомной улучшенной модели (классификация)

In [26]:
my_gb_cls_opt = MyGradientBoostingClassifier(
    n_estimators=best_params['n_estimators'],
    max_depth=best_params['max_depth'],
    learning_rate=best_params['learning_rate']
)
my_gb_cls_opt.fit(X_train_cls.values, y_train_cls.values)
y_pred_my_cls_opt = my_gb_cls_opt.predict(X_test_cls.values)

Вывод метрик классификации

In [27]:
print("Sklearn:", evaluate_classification(y_test_cls, y_pred_sk_cls_opt))
print("Custom :", evaluate_classification(y_test_cls, y_pred_my_cls_opt))

Sklearn: {'Accuracy': 1.0, 'F1': 1.0, 'ROC-AUC': 1.0}
Custom : {'Accuracy': 0.9073170731707317, 'F1': 0.91324200913242, 'ROC-AUC': 0.9061904761904761}


Подбор гиперпараметров

In [28]:
param_grid_reg = {
    'n_estimators': [50, 100],
    'max_depth': [3, 4],
    'learning_rate': [0.01, 0.1]
}

grid_reg = GridSearchCV(
    GradientBoostingRegressor(random_state=42),
    param_grid_reg,
    cv=5,
    scoring='r2'
)
grid_reg.fit(X_train_reg, y_train_reg)
best_params_r = grid_reg.best_params_


Запуск улучшенной модели sklearn (регрессия)

In [29]:
best_sk_reg = grid_reg.best_estimator_
y_pred_sk_reg_opt = best_sk_reg.predict(X_test_reg)

Запуск кастомной улучшенной модели (регрессия)

In [30]:
my_gb_reg_opt = MyGradientBoostingRegressor(
    n_estimators=best_params_r['n_estimators'],
    max_depth=best_params_r['max_depth'],
    learning_rate=best_params_r['learning_rate']
)
my_gb_reg_opt.fit(X_train_reg, y_train_reg)
y_pred_my_reg_opt = my_gb_reg_opt.predict(X_test_reg)

Вывод метрик регрессии

In [31]:
print("Sklearn:", evaluate_regression(y_test_reg, y_pred_sk_reg_opt))
print("Custom :", evaluate_regression(y_test_reg, y_pred_my_reg_opt))

Sklearn: {'MAE': 0.32100057333149834, 'RMSE': np.float64(0.43480077531102945), 'R2': 0.9981862332983186}
Custom : {'MAE': 0.2975157660032006, 'RMSE': np.float64(0.4003855745537311), 'R2': 0.9984619953489432}
