In [160]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from itertools import product
from collections import defaultdict

%matplotlib inline

In [161]:
BASE_DIR = os.getcwd()
csv_path = "sample_data/drug200.csv"

df = pd.read_csv(os.path.join(BASE_DIR, csv_path))
df

Unnamed: 0,Age,Sex,BP,Cholesterol,Na_to_K,Drug
0,23,F,HIGH,HIGH,25.355,drugY
1,47,M,LOW,HIGH,13.093,drugC
2,47,M,LOW,HIGH,10.114,drugC
3,28,F,NORMAL,HIGH,7.798,drugX
4,61,F,LOW,HIGH,18.043,drugY
...,...,...,...,...,...,...
195,56,F,LOW,HIGH,11.567,drugC
196,16,M,LOW,HIGH,12.006,drugC
197,52,M,NORMAL,HIGH,9.894,drugX
198,23,M,NORMAL,NORMAL,14.020,drugX


In [162]:
class Tree:
    def __init__(self, threshold=None, feature=None, prediction=None):
        if threshold is not None and feature is not None and prediction is None:
            self.feature = feature
            self.threshold = threshold
            self.is_leaf = False
        elif prediction is not None and threshold is None and feature is None:
            self.prediction = prediction
            self.is_leaf = True
        else:
            self.is_leaf = None
            print('Vertex in decision tree must be either predicat(only feature and threshold)'
                  'or leaf(only prediction)')

        self.right = None
        self.left = None

In [163]:
class Decision_tree:
    def __init__(self, max_depth=4, min_impurity_decrease=0.05, min_samples_leaf=5, criterion='entropy'):
        self.classes = []
        self.min_impurity_decrease = min_impurity_decrease
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.criterion = criterion
        self.tree = None
        self.prediction = None

    def _find_classes_amount(self, y):
        classes_amount = defaultdict(int)

        for i in y:
            classes_amount[i] += 1

        return classes_amount

    def _find_classes_weight(self, y):
        k, N = self.classes.shape[0], len(y)
        amount = self._find_classes_amount(y)
        weight = []

        for i in y:
            weight.append(N / (k * amount[i]))

        return np.array(weight)

    def _find_impurity(self, y, weight):
        impurity = 0
        total_weight = np.sum(weight)

        if self.criterion == 'entropy':
            for cls in self.classes:
                class_weight = weight[y == cls]
                p = np.sum(class_weight) / total_weight
                if p > 0:
                    impurity -= p * np.log2(p)
            return impurity
        elif self.criterion == 'gini':
            for cls in self.classes:
                class_weight = weight[y == cls]
                p = np.sum(class_weight) / total_weight
                impurity -= p * (1 - p)
            return impurity

    def _find_best_split(self, X, y):
        best_separator = None
        best_gain = -float('inf')
        best_main_impurity = 0

        # Рассчитываем общий вес и примесь для всего узла
        weight_node = self._find_classes_weight(y)
        total_weight = np.sum(weight_node)
        main_impurity = self._find_impurity(y, weight_node)

        for feature_index, feature in enumerate(X.T):
            # Для числовых признаков
            if isinstance(feature[0], (int, float)):
                # Создаем копии для безопасной работы
                feature_copy = feature.copy()
                y_copy = y.copy()

                # Сортируем данные по признаку
                sorted_indices = np.argsort(feature_copy)
                feature_sorted = feature_copy[sorted_indices]
                y_sorted = y_copy[sorted_indices]

                # Инициализируем предыдущее значение
                prev_value = feature_sorted[0]

                for i in range(1, len(feature_sorted)):
                    current_value = feature_sorted[i]

                    # Пропускаем одинаковые значения
                    if current_value == prev_value:
                        prev_value = current_value
                        continue

                    # Рассчитываем порог как среднее между соседними значениями
                    threshold = (prev_value + current_value) / 2.0

                    # Разделяем данные
                    left_mask = feature_copy <= threshold
                    right_mask = feature_copy > threshold

                    left_y = y_copy[left_mask]
                    right_y = y_copy[right_mask]

                    # Проверяем минимальный размер листьев
                    if len(left_y) < self.min_samples_leaf or len(right_y) < self.min_samples_leaf:
                        prev_value = current_value
                        continue

                    # Рассчитываем веса для подвыборок
                    weight_left = self._find_classes_weight(left_y)
                    weight_right = self._find_classes_weight(right_y)

                    # Рассчитываем примеси
                    left_impurity = self._find_impurity(left_y, weight_left)
                    right_impurity = self._find_impurity(right_y, weight_right)

                    # Рассчитываем общие веса для подвыборок
                    left_total_weight = np.sum(weight_left)
                    right_total_weight = np.sum(weight_right)

                    # Вычисляем прирост информации
                    gain = total_weight * main_impurity - (left_total_weight * left_impurity + right_total_weight * right_impurity)

                    # Обновляем лучшее разбиение
                    if gain > best_gain:
                        best_gain = gain
                        best_main_impurity = main_impurity
                        best_separator = [
                            feature_index,
                            np.where(left_mask)[0],  # Индексы левой подвыборки
                            np.where(right_mask)[0],  # Индексы правой подвыборки
                            threshold
                        ]

                    prev_value = current_value
            else:
                unique_val = np.unique(feature)
                n = len(unique_val)
                for mask in product([False, True], repeat=n):
                    if all(mask) or not any(mask):
                        continue

                    mask = np.array(mask)
                    val_mask = np.isin(feature, unique_val[mask])
                    left_mask, right_mask = val_mask, ~val_mask

                    weight = self._find_classes_weight(y)
                    total_weight = np.sum(weight)
                    main_impurity = self._find_impurity(y, weight)

                    left_y = y[left_mask]
                    left_weight = self._find_classes_weight(left_y)
                    left_impurity = self._find_impurity(left_y, left_weight)

                    right_y = y[right_mask]
                    right_weight = self._find_classes_weight(right_y)
                    right_impurity = self._find_impurity(right_y, right_weight)

                    left = np.sum(left_weight)
                    right = total_weight - left
                    gain = total_weight * main_impurity - (left * left_impurity + right * right_impurity)

                    if gain > best_gain:
                        best_gain = gain
                        best_main_impurity = main_impurity
                        threshold = unique_val[mask]
                        best_separator = [feature_index, left_mask, right_mask, threshold]

        return best_main_impurity, best_gain, best_separator

    def _is_pure(self, y):
        amount = self._find_classes_amount(y)
        n = len(y)

        for _, num in amount.items():
            if num / n >= 1:
                return True
        return False

    def _build_tree(self, X, y, depth=0):

        probability_vector = self._find_classes_amount(y)

        stop_conditions = [
            len(y) < self.min_samples_leaf,
            depth >= self.max_depth,
            self._is_pure(y)
        ]

        if any(stop_conditions):
            return Tree(prediction=probability_vector)

        main_impurity, gain, split_criteria = self._find_best_split(X, y)

        if gain < self.min_impurity_decrease * main_impurity:
            return Tree(prediction=probability_vector)

        if split_criteria is None:
            return Tree(prediction=probability_vector)

        feature_index, left_index, right_index, threshold = split_criteria

        root = Tree(feature=feature_index, threshold=threshold)
        root.left = self._build_tree(X[left_index], y[left_index], depth + 1)
        root.right = self._build_tree(X[right_index], y[right_index], depth + 1)

        return root

    def _draw_node(self, ax, node, x, y, dx=1, dy=1):
        if node.is_leaf is False and isinstance(node.threshold, (int, float)):
            ax.text(x, y, f"X{node.feature} ≤ {node.threshold:.2f}",
                    bbox=dict(facecolor='white'),
                    ha='center', va='center')
        elif node.is_leaf is False:
            ax.text(x, y, f"X{node.feature} == {node.threshold}",
                    bbox=dict(facecolor='white'),
                    ha='center', va='center')
        elif node.is_leaf is True:
            ax.text(x, y, f"{len(self.prediction) + 1}",
                    bbox=dict(facecolor='white'),
                    ha='center', va='center')
            self.prediction.append(node.prediction)

        if node.left:
            x_left = x - dx
            y_left = y - dy
            ax.plot([x, x_left], [y, y_left], 'k-')
            self._draw_node(ax, node.left, x_left, y_left, dx/2, dy)

        if node.right:
            x_right = x + dx
            y_right = y - dy
            ax.plot([x, x_right], [y, y_right], 'k-')
            self._draw_node(ax, node.right, x_right, y_right,  dx/2, dy)

    def visualize_tree(self):
        self.prediction = []
        if self.tree:
            fig, ax = plt.subplots(figsize=(20, 8))
            self._draw_node(ax, self.tree, x=0, y=0, dx=10, dy=1)
            ax.axis('off')
            plt.show()
            for i in range(len(self.prediction)):
                print(f'{i + 1}: {self.prediction[i]}')
        else:
            print('Before visualize, tree should be train first')

    def train(self, X, y):
        self.classes = np.unique(y)

        self.tree = self._build_tree(X, y)
        return self

    def predict(self, X):
        prediction = []
        for i in X:
            tree = self.tree
            while not tree.is_leaf:
                if isinstance(tree.threshold, (int, float)) and i[tree.feature] >= tree.threshold:
                    tree = tree.right
                elif type(tree.threshold) == list and i[tree.feature] not in tree.threshold:
                    tree = tree.right
                else:
                    tree = tree.left
            res = ['', 0]
            for key, val in tree.prediction.items():
                if val > res[1]: res = [key, val]
            prediction.append(res[0])

        return prediction

In [164]:
from sklearn.model_selection import train_test_split

num = 500
res = [0, 1, 0]
for _ in range(num):
    df = df.sample(frac=1)

    data = df.to_numpy()
    m, n = data.shape

    X, y = data[: , :n-1], data[:, -1]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y,
        test_size=0.2,
        stratify=y
    )

    a = Decision_tree(max_depth=10, min_impurity_decrease=0.01, min_samples_leaf=17)
    a.train(X_train, y_train)
    # a.visualize_tree()
    acc = np.mean(a.predict(X_test) == y_test)
    res[0] += acc
    if res[1] > acc: res[1] = acc
    if res[2] < acc: res[2] = acc
print(f'Mean accuracy: {res[0] / num}, Min: {res[1]}, Max: {res[2]}')

Mean accuracy: 0.7195500000000018, Min: 0.575, Max: 0.725
