In [1]:
#!pip install numpy scipy matplotlib scikit-learn pandas

# Подключение библиотек

In [2]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error, r2_score, recall_score, make_scorer
from sklearn.model_selection import GridSearchCV
from sklearn.impute import SimpleImputer

# Реализация структуры данных дерева

In [3]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
        self.feature = feature
        self.threshold = threshold
        self.left = left
        self.right = right
        self.value = value

# Собственная реализация классификатора дерева решений

In [None]:
class MyDecisionTreeClassifier:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def _gini_impurity(self, counts):
        n = counts.sum()
        if n == 0:
            return 0.0
        probs = counts / n
        return 1.0 - np.dot(probs, probs)

    def _best_split(self, X, y):
        n, m = X.shape
        if n <= 1:
            return None, None

        best_gain = -1
        best_f, best_t = None, None
        total_counts = np.bincount(y, minlength=self.n_classes_)
        parent_gini = self._gini_impurity(total_counts)

        for f in range(m):
            idx = np.argsort(X[:, f])
            x_sorted = X[idx, f]
            y_sorted = y[idx]

            left_counts = np.zeros(self.n_classes_, dtype=int)
            right_counts = total_counts.copy()

            for i in range(1, n):
                if x_sorted[i] == x_sorted[i-1]:
                    cls = y_sorted[i-1]
                    left_counts[cls] += 1
                    right_counts[cls] -= 1
                    continue

                cls = y_sorted[i-1]
                left_counts[cls] += 1
                right_counts[cls] -= 1

                n_left, n_right = i, n - i
                if n_left < self.min_samples_split or n_right < self.min_samples_split:
                    continue

                gini_left = self._gini_impurity(left_counts)
                gini_right = self._gini_impurity(right_counts)
                gini_split = (n_left * gini_left + n_right * gini_right) / n
                gain = parent_gini - gini_split

                if gain > best_gain:
                    best_gain = gain
                    best_f, best_t = f, (x_sorted[i] + x_sorted[i-1]) / 2.0

        return (best_f, best_t) if best_gain > 1e-7 else (None, None)

    def _build_tree(self, X, y, depth=0):
        n = len(y)
        if n == 0:
            return Node(value=0)

        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return Node(value=int(np.bincount(y).argmax()))

        feature, threshold = self._best_split(X, y)
        if feature is None:
            return Node(value=int(np.bincount(y).argmax()))

        mask = X[:, feature] <= threshold
        left = self._build_tree(X[mask], y[mask], depth + 1)
        right = self._build_tree(X[~mask], y[~mask], depth + 1)
        return Node(feature=feature, threshold=threshold, left=left, right=right)

    def fit(self, X, y):
        X = np.asarray(X, dtype=np.float32)
        y = np.asarray(y)
        self.classes_, y = np.unique(y, return_inverse=True)
        self.n_classes_ = len(self.classes_)
        self.tree_ = self._build_tree(X, y)
        return self

    def predict(self, X):
        X = np.asarray(X)
        preds = np.empty(len(X), dtype=int)
        for i, x in enumerate(X):
            node = self.tree_
            while node.value is None:
                node = node.left if x[node.feature] <= node.threshold else node.right
            preds[i] = node.value
        return self.classes_[preds]
    
    def get_params(self, deep=True):
        return {
            'max_depth': self.max_depth,
            'min_samples_split': self.min_samples_split
        }

    def set_params(self, **params):
        for key, value in params.items():
            if key in {'max_depth', 'min_samples_split'}:
                setattr(self, key, value)
            else:
                raise ValueError(f"Invalid parameter {key} for estimator MyDecisionTreeClassifier.")
        return self

# Реализация регрессора дерева решения

In [None]:
class MyDecisionTreeRegressor:
    def __init__(self, max_depth=None, min_samples_split=2):
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split

    def _mse_from_sums(self, sum_y, sum_sq_y, n):
        if n == 0:
            return 0.0
        return (sum_sq_y - (sum_y ** 2) / n) / n

    def _best_split(self, X, y):
        n, m = X.shape
        if n <= 1:
            return None, None

        best_mse = np.inf
        best_f, best_t = None, None

        total_sum = y.sum()
        total_sq = np.dot(y, y)

        for f in range(m):
            idx = np.argsort(X[:, f])
            x_sorted = X[idx, f]
            y_sorted = y[idx]

            sum_left = 0.0
            sq_left = 0.0

            for i in range(1, n):
                y_val = y_sorted[i-1]
                sum_left += y_val
                sq_left += y_val * y_val

                n_left, n_right = i, n - i
                if n_left < self.min_samples_split or n_right < self.min_samples_split:
                    continue

                if x_sorted[i] == x_sorted[i-1]:
                    continue

                sum_right = total_sum - sum_left
                sq_right = total_sq - sq_left

                mse_left = self._mse_from_sums(sum_left, sq_left, n_left)
                mse_right = self._mse_from_sums(sum_right, sq_right, n_right)
                mse_split = (n_left * mse_left + n_right * mse_right) / n

                if mse_split < best_mse:
                    best_mse = mse_split
                    best_f = f
                    best_t = (x_sorted[i] + x_sorted[i-1]) / 2.0

        return (best_f, best_t) if best_mse < np.inf else (None, None)

    def _build_tree(self, X, y, depth=0):
        n = len(y)
        if (self.max_depth is not None and depth >= self.max_depth) or n < self.min_samples_split:
            return Node(value=np.mean(y))

        feature, threshold = self._best_split(X, y)
        if feature is None:
            return Node(value=np.mean(y))

        mask = X[:, feature] <= threshold
        left = self._build_tree(X[mask], y[mask], depth + 1)
        right = self._build_tree(X[~mask], y[~mask], depth + 1)
        return Node(feature=feature, threshold=threshold, left=left, right=right)

    def fit(self, X, y):
        X = np.asarray(X, dtype=np.float32)
        y = np.asarray(y, dtype=np.float32)
        self.tree_ = self._build_tree(X, y)
        return self

    def predict(self, X):
        X = np.asarray(X)
        preds = np.empty(len(X), dtype=np.float32)
        for i, x in enumerate(X):
            node = self.tree_
            while node.value is None:
                node = node.left if x[node.feature] <= node.threshold else node.right
            preds[i] = node.value
        return preds
    
    def get_params(self, deep=True):
        return {
            'max_depth': self.max_depth,
            'min_samples_split': self.min_samples_split
        }

    def set_params(self, **params):
        for key, value in params.items():
            if key in {'max_depth', 'min_samples_split'}:
                setattr(self, key, value)
            else:
                raise ValueError(f"Invalid parameter {key} for estimator MyDecisionTreeClassifier.")
        return self

# Проверка классификации

In [6]:
df = pd.read_csv('classification.csv')

classification_X = df.drop('Bankrupt?', axis=1)
classification_y = df['Bankrupt?']

classification_X_train, classification_X_test, classification_y_train, classification_y_test = train_test_split(
    classification_X, classification_y, test_size=0.2, random_state=42, stratify=classification_y
)

sk_clf = DecisionTreeClassifier()
sk_clf.fit(classification_X_train, classification_y_train)
classification_y_pred_sk = sk_clf.predict(classification_X_test)

my_clf = MyDecisionTreeClassifier()
my_clf.fit(classification_X_train, classification_y_train)
classification_y_pred_my = my_clf.predict(classification_X_test)

print(f"Sklearn accuracy={accuracy_score(classification_y_test, classification_y_pred_sk):.4f}, f1={f1_score(classification_y_test, classification_y_pred_sk, average='weighted'):.4f}, recall={recall_score(classification_y_test, classification_y_pred_sk, pos_label=1)}")
print(f"Custom  accuracy={accuracy_score(classification_y_test, classification_y_pred_my):.4f}, f1={f1_score(classification_y_test, classification_y_pred_my, average='weighted'):.4f}, recall={recall_score(classification_y_test, classification_y_pred_my, pos_label=1)}")

Sklearn accuracy=0.9597, f1=0.9595, recall=0.36363636363636365
Custom  accuracy=0.9626, f1=0.9605, recall=0.3181818181818182


# Провека регрессии

In [7]:
df = pd.read_csv('regression.csv')
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]

regression_X = df.drop(columns=['salary_in_usd', 'salary', 'salary_currency'], axis=1)
regression_y = df['salary_in_usd'].to_numpy()

regression_X = pd.get_dummies(regression_X, drop_first=True).to_numpy(dtype=np.float32)

regression_X_train, regression_X_test, regression_y_train, regression_y_test = train_test_split(
    regression_X, regression_y, test_size=0.2, random_state=42
)


sk_reg = DecisionTreeRegressor()
sk_reg.fit(regression_X_train, regression_y_train)
regression_y_pred_sk = sk_reg.predict(regression_X_test)

my_reg = MyDecisionTreeRegressor()
my_reg.fit(regression_X_train, regression_y_train)
regression_y_pred_my = my_reg.predict(regression_X_test)
print(f"SkLearn RMSE={np.sqrt(mean_squared_error(regression_y_test, regression_y_pred_sk)):.4f}, R2={r2_score(regression_y_test, regression_y_pred_sk):.4f}")
print(f"Custom  RMSE={np.sqrt(mean_squared_error(regression_y_test, regression_y_pred_my)):.4f}, R2={r2_score(regression_y_test, regression_y_pred_my):.4f}")

SkLearn RMSE=51448.4117, R2=0.3094
Custom  RMSE=47908.9323, R2=0.4011


### Улучшение

### Классификация

### Подбор гиперпараметров

In [8]:
scorer = make_scorer(recall_score, pos_label=1)

sk_param_grid = {
    'max_depth': [4, 5, 6, 7],
    'min_samples_split': [8, 10, 12],
    'min_samples_leaf': [3, 5],
    'class_weight': ['balanced', {0: 1, 1: 5}, {0: 1, 1: 10}],
    'criterion': ['gini']
}

my_param_grid = {
    'max_depth': [4, 5, 6, 7],
    'min_samples_split': [8, 10, 12]
}

classification_sk_grid = GridSearchCV(
    DecisionTreeClassifier(random_state=42),
    sk_param_grid,
    cv=5,
    scoring=scorer,
    n_jobs=-1
)

classification_sk_grid.fit(classification_X_train, classification_y_train)
sk_best_clf = classification_sk_grid.best_estimator_

classification_my_grid = GridSearchCV(
    MyDecisionTreeClassifier(),
    my_param_grid,
    cv=5,
    scoring=scorer,
    n_jobs=-1
)

classification_my_grid.fit(classification_X_train, classification_y_train)
my_best_clf = classification_my_grid.best_estimator_

### Предсказание

In [9]:
classification_y_pred_sk = sk_best_clf.predict(classification_X_test)

classification_y_pred_my = my_best_clf.predict(classification_X_test)

### Метрики

In [10]:
print(f"Sklearn accuracy={accuracy_score(classification_y_test, classification_y_pred_sk):.4f}, f1={f1_score(classification_y_test, classification_y_pred_sk, average='weighted'):.4f}, recall={recall_score(classification_y_test, classification_y_pred_sk, pos_label=1)}")
print(f"Custom  accuracy={accuracy_score(classification_y_test, classification_y_pred_my):.4f}, f1={f1_score(classification_y_test, classification_y_pred_my, average='weighted'):.4f}, recall={recall_score(classification_y_test, classification_y_pred_my, pos_label=1)}")

Sklearn accuracy=0.8842, f1=0.9163, recall=0.7727272727272727
Custom  accuracy=0.9677, f1=0.9648, recall=0.3409090909090909


### Регрессия

### Подбор гиперпараметров

In [11]:
scorer = make_scorer(r2_score)

sk_param_grid = {
    'max_depth': [4, 5, 6, 7],
    'min_samples_split': [8, 10, 12],
    'min_samples_leaf': [3, 5]
}

my_param_grid = {
    'max_depth': [4, 5, 6, 7],
    'min_samples_split': [8, 10, 12]
}

regression_sk_grid = GridSearchCV(
    DecisionTreeRegressor(random_state=42),
    sk_param_grid,
    cv=5,
    scoring=scorer,
    n_jobs=-1
)

regression_sk_grid.fit(regression_X_train, regression_y_train)
sk_best_reg = regression_sk_grid.best_estimator_

regression_my_grid = GridSearchCV(
    MyDecisionTreeRegressor(),
    my_param_grid,
    cv=5,
    scoring=scorer,
    n_jobs=-1
)

regression_my_grid.fit(regression_X_train, regression_y_train)
my_best_reg = regression_my_grid.best_estimator_

### Предсказание

In [12]:
regression_y_pred_sk = sk_best_reg.predict(regression_X_test)

regression_y_pred_my = my_best_reg.predict(regression_X_test)

### Метрики

In [13]:
print(f"SkLearn RMSE={np.sqrt(mean_squared_error(regression_y_test, regression_y_pred_sk)):.4f}, R2={r2_score(regression_y_test, regression_y_pred_sk):.4f}")
print(f"Custom  RMSE={np.sqrt(mean_squared_error(regression_y_test, regression_y_pred_my)):.4f}, R2={r2_score(regression_y_test, regression_y_pred_my):.4f}")

SkLearn RMSE=43917.1502, R2=0.4968
Custom  RMSE=45480.9295, R2=0.4603
