In [3]:
import pandas as pd
import numpy as np
import math
from collections import Counter

from sklearn import preprocessing
from sklearn.model_selection import train_test_split

from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import DecisionTreeRegressor

# Загрузка датасетов

In [4]:
data = pd.read_csv('train_and_test2.csv')

In [5]:
data = data[['Age', 'Fare', 'Sex', 'sibsp', 'Parch', 'Embarked', '2urvived']]

In [6]:
data.describe()

Unnamed: 0,Age,Fare,Sex,sibsp,Parch,Embarked,2urvived
count,1309.0,1309.0,1309.0,1309.0,1309.0,1307.0,1309.0
mean,29.503186,33.281086,0.355997,0.498854,0.385027,1.492731,0.261268
std,12.905241,51.7415,0.478997,1.041658,0.86556,0.814626,0.439494
min,0.17,0.0,0.0,0.0,0.0,0.0,0.0
25%,22.0,7.8958,0.0,0.0,0.0,1.0,0.0
50%,28.0,14.4542,0.0,0.0,0.0,2.0,0.0
75%,35.0,31.275,1.0,1.0,0.0,2.0,1.0
max,80.0,512.3292,1.0,8.0,9.0,2.0,1.0


In [7]:
features = data.drop(['2urvived'], axis=1)
target = data['2urvived']

X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.3, random_state=12345)

In [8]:
data = pd.read_csv('possum.csv')

features = data.drop(columns=['hdlngth', 'case', 'Pop'], axis=1)
target = data['hdlngth']

le = preprocessing.LabelEncoder()
for col in ['site', 'sex']:
    features[col] = le.fit_transform(features[col])

X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(features, target, test_size=0.3, random_state=12345)

# Метрики

In [9]:
#accuracy

def accuracy(y_test, y_pred):
    compare_df = pd.DataFrame({'y': list(y_test), 'y_pred': y_pred})
    
    TP = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 1)]
    FP = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 1)]
    FN = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 0)]
    TN = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 0)]
    
    return (len(TP) + len(TN)) / len(compare_df)

In [10]:
#precession

def precession(y_test, y_pred):
    compare_df = pd.DataFrame({'y': list(y_test), 'y_pred': y_pred})
    
    TP = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 1)]
    FP = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 1)]
    FN = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 0)]
    TN = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 0)]
    
    return len(TP) / (len(TP) + len(FP))

In [11]:
#recall

def recall(y_test, y_pred):
    compare_df = pd.DataFrame({'y': list(y_test), 'y_pred': y_pred})
    
    TP = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 1)]
    FP = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 1)]
    FN = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 0)]
    TN = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 0)]
    
    return len(TP) / (len(TP) + len(FN))

In [12]:
#f1

def f1(y_test, y_pred):
    compare_df = pd.DataFrame({'y': list(y_test), 'y_pred': y_pred})
    
    TP = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 1)]
    FP = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 1)]
    FN = compare_df.loc[(compare_df['y'] == 1) & (compare_df['y_pred'] == 0)]
    TN = compare_df.loc[(compare_df['y'] == 0) & (compare_df['y_pred'] == 0)]
    
    precession_ = precession(y_test, y_pred)
    recall_ = recall(y_test, y_pred)
    return 2 * precession_ * recall_ / (recall_+ precession_)

In [13]:
# mse
def mse(y_pred, y_targ):
    y_pred = np.array(y_pred)
    y_targ = np.array(y_targ)
    return np.mean(np.square(y_targ - y_pred))

# rmse
def rmse(y_pred, y_targ):
    y_pred = np.array(y_pred)
    y_targ = np.array(y_targ)
    return (np.sum((y_targ - y_pred) ** 2) / len(y_test)) ** 0.5

# r2
def r2(y_pred, y_targ):
    y_pred = np.array(y_pred)
    y_targ = np.array(y_targ)
    
    sse = np.sum((y_pred - y_targ) ** 2)
    sst = np.sum((y_pred - np.mean(y_pred)) ** 2)
    return abs(1 - sse / sst)

# Алгоритмы

In [14]:
class Node:
    def __init__(self, feature=None, threshold=None, left=None, right=None, prediction=None, left_proba=None):
        self.feature = feature  # индекс признака
        self.threshold = threshold  # порог для разбиения
        self.left = left  # левый дочерний узел
        self.right = right  # правый дочерний узел
        self.prediction = prediction  # предсказание для листового узла
        self.left_proba = left_proba
        self.num_elems = 0


class DecisionTreeID3Classifier:
    def __init__(self, max_depth, method, prun=False):
        self.root = None
        self.max_depth = max_depth
        self.method = method
        self.probas = {}
        self.max_proba = 0
        self.majority = None
        self.prun = prun

    def fit(self, X, y):
        for i in set(y):
            self.probas[i] = len(y.loc[y == i]) / len(y)

        self.root = self.build_tree(np.array(X), np.array(y), 0)

        self.majority = y.value_counts().argmax()

    def predict_instance(self, node, sample):
        if np.nan not in sample:
            if node.prediction is not None:
                node.num_elems += 1
                return node.prediction
            else:
                if sample[node.feature] <= node.threshold:
                    node.num_elems += 1
                    return self.predict_instance(node.left, sample)
                else:
                    node.num_elems += 1
                    return self.predict_instance(node.right, sample)
        else:
            if node.feature is None:
                return node.prediction
            else:
                return self.count_nan(node, sample, 0)

    def pruning(self, node):
        if node.num_elems == 0:
            node.prediction = self.majority
        else:
            if node.prediction is None:
                self.pruning(node.left)
                self.pruning(node.right)

    def count_nan(self, node, sample, proba):
        if node.prediction is not None:
            if self.max_proba < proba * self.probas[int(node.prediction)]:
                self.max_proba = proba * self.probas[int(node.prediction)]
                return self.probas[int(node.prediction)]
        else:
            if proba != 0:
                self.count_nan(node.left, sample, node.left_proba*proba)
                self.count_nan(node.right, sample, (1-node.left_proba)*proba)
            else:
                self.count_nan(node.left, sample, node.left_proba)
                self.count_nan(node.right, sample, (1 - node.left_proba))

    def predict(self, X):
        pred = [self.predict_instance(self.root, sample) for sample in np.array(X)]
        if self.prun is True:
            self.pruning(self.root)
            pred = [self.predict_instance(self.root, sample) for sample in np.array(X)]
            return pred
        else:
            return pred

    @staticmethod
    def entropy(y):
        counter = Counter(y)
        total_instances = len(y)
        return -sum((count / total_instances) * math.log2(count / total_instances) for count in counter.values())

    @staticmethod
    def donskoy(y, left_mask, right_mask):
        left = {}
        right = {}
        delta = 0

        for i in set(y):
            left[i] = len(pd.Series(y[left_mask]).loc[pd.Series(y[left_mask]) == i])
            right[i] = len(pd.Series(y[right_mask]).loc[pd.Series(y[right_mask]) == i])
        if len(left.keys()) >= len(right.keys()):
            for key, value in left.items():
                if key in right.keys():
                    delta += value * right[key]
        else:
            for key, value in right.items():
                if key in left.keys():
                    delta += value * left[key]
        return delta

    def information_gain(self, y, x_column, threshold):
        left_mask = x_column <= threshold
        right_mask = x_column > threshold
        n_total = len(y)

        if len(y[left_mask]) == 0 or len(y[right_mask]) == 0:
            return 0, 0

        n_left = len(y[left_mask])
        n_right = len(y[right_mask])
        left_proba = n_left / n_total
        delta = 0

        if self.method == 'entropy':
            parent = self.entropy(y)
            child = (n_left / n_total) * self.entropy(y[left_mask]) + (n_right / n_total) * self.entropy(y[right_mask])
            delta = parent - child
        else:
            delta = self.donskoy(y, left_mask, right_mask)
                
        return delta, left_proba

    def best_split(self, X, y):
        best_gain = 0
        best_feature = None
        best_threshold = None
        best_proba_left = None

        n_features = X.shape[1]

        for feature in range(n_features):
            thresholds = set(X[:, feature])

            for threshold in thresholds:
                gain, left_proba = self.information_gain(y, X[:, feature], threshold)

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold
                    best_proba_left = left_proba

        return best_feature, best_threshold, best_proba_left

    def build_tree(self, X, y, depth):
        # Если все примеры принадлежат одному классу
        if len(set(y)) == 1:
            return Node(prediction=y[0])

        if self.max_depth is not None and depth >= self.max_depth:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        # Если нет признаков для разбиения
        if X.shape[1] == 0:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        # Находим лучший признак и порог
        feature, threshold, left_proba = self.best_split(X, y)

        if feature is None:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        left_mask = X[:, feature] <= threshold
        right_mask = X[:, feature] > threshold

        left_node = self.build_tree(X[left_mask], y[left_mask], depth + 1)
        right_node = self.build_tree(X[right_mask], y[right_mask], depth + 1)

        return Node(feature=feature, threshold=threshold, left=left_node, right=right_node, left_proba=left_proba)

    
class DecisionTreeID3Regressor:
    def __init__(self, max_depth, prun=False):
        self.root = None
        self.max_depth = max_depth
        self.probas = {}
        self.max_proba = 0
        self.majority = None
        self.prun = prun

    def fit(self, X, y):
        for i in set(y):
            self.probas[i] = len(y.loc[y == i]) / len(y)

        self.root = self.build_tree(np.array(X), np.array(y), 0)

        self.majority = y.value_counts().argmax()

    def predict_instance(self, node, sample):
        if np.nan not in sample:
            if node.prediction is not None:
                node.num_elems += 1
                return node.prediction
            else:
                if sample[node.feature] <= node.threshold:
                    return self.predict_instance(node.left, sample)
                else:
                    return self.predict_instance(node.right, sample)
        else:
            if node.feature is None:
                return node.prediction
            else:
                return self.count_nan(node, sample, 0)

    def pruning(self, node):
        if node.num_elems == 0:
            node.prediction = self.majority
        else:
            self.pruning(node.left)
            self.pruning(node.right)

    def count_nan(self, node, sample, proba):
        if node.prediction is not None:
            if self.max_proba < proba * self.probas[int(node.prediction)]:
                self.max_proba = proba * self.probas[int(node.prediction)]
                return self.probas[int(node.prediction)]
        else:
            if proba != 0:
                self.count_nan(node.left, sample, node.left_proba*proba)
                self.count_nan(node.right, sample, (1-node.left_proba)*proba)
            else:
                self.count_nan(node.left, sample, node.left_proba)
                self.count_nan(node.right, sample, (1 - node.left_proba))

    def predict(self, X):
        pred = [self.predict_instance(self.root, sample) for sample in np.array(X)]
        self.pruning(self.root)
        return pred

    @staticmethod
    def calculate_mse(left, right):
        total_samples = len(left) + len(right)
        mean_left = np.mean(left) if len(left) > 0 else 0
        mean_right = np.mean(right) if len(right) > 0 else 0

        mse_left = np.sum((left - mean_left) ** 2) if len(left) > 0 else 0
        mse_right = np.sum((right - mean_right) ** 2) if len(right) > 0 else 0

        return (mse_left + mse_right) / total_samples

    def information_gain(self, y, x_column, threshold):
        left_mask = x_column <= threshold
        right_mask = x_column > threshold
        n_total = len(y)

        if len(y[left_mask]) == 0 or len(y[right_mask]) == 0:
            return 0, 0

        n_left = len(y[left_mask])
        left_proba = n_left / n_total

        delta = self.calculate_mse(y[left_mask], y[right_mask])

        return delta, left_proba

    def best_split(self, X, y):
        best_gain = 0
        best_feature = None
        best_threshold = None
        best_proba_left = None

        n_features = X.shape[1]

        for feature in range(n_features):
            thresholds = set(X[:, feature])

            for threshold in thresholds:
                gain, left_proba = self.information_gain(y, X[:, feature], threshold)

                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold
                    best_proba_left = left_proba

        return best_feature, best_threshold, best_proba_left

    def build_tree(self, X, y, depth):
        # Если все принадлежат одному классу
        if len(set(y)) == 1:
            return Node(prediction=y[0])

        if self.max_depth is not None and depth >= self.max_depth:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        # Если нет признаков для разбиения
        if X.shape[1] == 0:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        # Находим лучший признак и порог
        feature, threshold, left_proba = self.best_split(X, y)

        if feature is None:
            return Node(prediction=Counter(y).most_common(1)[0][0])

        left_mask = X[:, feature] <= threshold
        right_mask = X[:, feature] > threshold

        left_node = self.build_tree(X[left_mask], y[left_mask], depth + 1)
        right_node = self.build_tree(X[right_mask], y[right_mask], depth + 1)

        return Node(feature=feature, threshold=threshold, left=left_node, right=right_node, left_proba=left_proba)

# Расчет

### Классификация

***Энтропийный критерий***

In [39]:
%%time
# с прунингом
tree = DecisionTreeID3Classifier(method='entropy', max_depth=7, prun=True)
tree.fit(X_train, y_train)
y_pred = tree.predict(X_test)

CPU times: total: 500 ms
Wall time: 772 ms


In [40]:
print(f'accuracy: {round(accuracy(y_test, y_pred), 2)}')
print(f'recall: {round(recall(y_test, y_pred), 2)}')
print(f'precession: {round(precession(y_test, y_pred), 2)}')
print(f'f1: {round(f1(y_test, y_pred), 2)}')

accuracy: 0.8
recall: 0.54
precession: 0.6
f1: 0.57


In [17]:
%%time

tree = DecisionTreeID3Classifier(method='entropy', max_depth=7)
tree.fit(X_train, y_train)
y_pred = tree.predict(X_test)

CPU times: total: 344 ms
Wall time: 744 ms


In [18]:
print(f'accuracy: {round(accuracy(y_test, y_pred), 2)}')
print(f'recall: {round(recall(y_test, y_pred), 2)}')
print(f'precession: {round(precession(y_test, y_pred), 2)}')
print(f'f1: {round(f1(y_test, y_pred), 2)}')

accuracy: 0.8
recall: 0.54
precession: 0.6
f1: 0.57


***Критерий Донского***

In [19]:
%%time
tree = DecisionTreeID3Classifier(method='donskoy', max_depth=15)
tree.fit(X_train, y_train)
y_pred = tree.predict(X_test)

CPU times: total: 4.94 s
Wall time: 10.2 s


In [20]:
print(f'accuracy: {round(accuracy(y_test, y_pred), 2)}')
print(f'recall: {round(recall(y_test, y_pred), 2)}')
print(f'precession: {round(precession(y_test, y_pred), 2)}')
print(f'f1: {round(f1(y_test, y_pred), 2)}')

accuracy: 0.66
recall: 0.28
precession: 0.3
f1: 0.29


In [21]:
%%time
tree = DecisionTreeID3Classifier(method='donskoy', max_depth=15, prun=True)
tree.fit(X_train, y_train)
y_pred = tree.predict(X_test)

CPU times: total: 5.19 s
Wall time: 10.2 s


In [22]:
print(f'accuracy: {round(accuracy(y_test, y_pred), 2)}')
print(f'recall: {round(recall(y_test, y_pred), 2)}')
print(f'precession: {round(precession(y_test, y_pred), 2)}')
print(f'f1: {round(f1(y_test, y_pred), 2)}')

accuracy: 0.66
recall: 0.28
precession: 0.3
f1: 0.29


***Автоматический алгоритм***

In [33]:
%%time
tree_auto = DecisionTreeClassifier(random_state=0, max_depth=8)
tree_auto.fit(X_train, y_train)
y_pred_auto = tree_auto.predict(X_test)

CPU times: total: 0 ns
Wall time: 15.9 ms


In [34]:
print(f'accuracy: {round(accuracy(y_test, y_pred_auto), 2)}')
print(f'recall: {round(recall(y_test, y_pred_auto), 2)}')
print(f'precession: {round(precession(y_test, y_pred_auto), 2)}')
print(f'f1: {round(f1(y_test, y_pred_auto), 2)}')

accuracy: 0.8
recall: 0.53
precession: 0.59
f1: 0.56


### Регрессия

In [25]:
%%time

tree = DecisionTreeID3Regressor(max_depth=10, prun=True)
tree.fit(X_train_reg, y_train_reg)
y_pred_reg = tree.predict(X_test_reg)

CPU times: total: 93.8 ms
Wall time: 194 ms


In [26]:
scaler = StandardScaler()

scaler.fit(np.array(y_pred_reg).reshape(-1, 1))
y_pred_reg = scaler.transform(np.array(y_pred_reg).reshape(-1, 1))

scaler.fit(np.array(y_test_reg).reshape(-1, 1))
y_test_reg = scaler.transform(np.array(y_test_reg).reshape(-1, 1))

print(f'mse: {round(mse(y_pred_reg, y_test_reg), 2)}')
print(f'rmse: {round(rmse(y_pred_reg, y_test_reg), 2)}')

mse: 1.83
rmse: 0.39


In [27]:
%%time

tree = DecisionTreeID3Regressor(max_depth=10)
tree.fit(X_train_reg, y_train_reg)
y_pred_reg = tree.predict(X_test_reg)

CPU times: total: 109 ms
Wall time: 172 ms


In [28]:
scaler = StandardScaler()

scaler.fit(np.array(y_pred_reg).reshape(-1, 1))
y_pred_reg = scaler.transform(np.array(y_pred_reg).reshape(-1, 1))

scaler.fit(np.array(y_test_reg).reshape(-1, 1))
y_test_reg = scaler.transform(np.array(y_test_reg).reshape(-1, 1))

print(f'mse: {round(mse(y_pred_reg, y_test_reg), 2)}')
print(f'rmse: {round(rmse(y_pred_reg, y_test_reg), 2)}')

mse: 1.83
rmse: 0.39


In [29]:
%%time
tree_auto = DecisionTreeRegressor(random_state=0, max_depth=10)
tree_auto.fit(X_train_reg, y_train_reg)
y_pred_auto = tree_auto.predict(X_test_reg)

CPU times: total: 0 ns
Wall time: 7.46 ms


In [30]:
scaler = StandardScaler()
scaler.fit(np.array(y_pred_auto).reshape(-1, 1))
y_pred_auto = scaler.transform(np.array(y_pred_auto).reshape(-1, 1))

scaler.fit(np.array(y_test_reg).reshape(-1, 1))
y_test_reg = scaler.transform(np.array(y_test_reg).reshape(-1, 1))

print(f'mse: {round(mse(y_pred_auto, y_test_reg), 2)}')
print(f'rmse: {round(rmse(y_pred_auto, y_test_reg), 2)}')

mse: 0.7
rmse: 0.24
