In [1]:
from collections import Counter
import numpy as np
import pandas as pd
import random

In [2]:
def calc_logloss(y, p):
    p = np.clip(p, 1e-16, 1-1e-16)
    return (- y * np.log(p) - (1.0 - y) * np.log(1.0 - p)).mean()

In [4]:
class RandomForest:
    
    def __init__(self, n_trees=1, criterion='gini', min_leaf=1, max_depth=np.inf, random_state=0, n_features='squared'):
        
        random.seed(random_state)
        self.forest = None
        self.n_trees = n_trees
        self.min_leaf = min_leaf
        self.max_depth = max_depth
        self.criterion = criterion
        self.n_features = n_features
    
    class Node:
        
        def __init__(self, index, t, true_branch, false_branch):
            
            self.index = index  # индекс признака, по которому ведется сравнение с порогом в этом узле
            self.t = t  # значение порога
            self.true_branch = true_branch  # поддерево, удовлетворяющее условию в узле
            self.false_branch = false_branch  # поддерево, не удовлетворяющее условию в узле
    
    class Leaf:
        
        def __init__(self, data, labels, criterion):
            
            self.data = data
            self.labels = labels
            self.criterion = criterion
            self.prediction = self.predict()
    
        def predict(self):
            
            if self.criterion == 'regression':
                return self.labels.mean()
            
            else:
                classes = Counter(self.labels)
                return max(classes, key=classes.get)
    
    def get_bootstrap(self, data, labels, n):
        n_samples = data.shape[0]
        bootstrap = []
        
        for i in range(n):
            
            b_data = np.zeros(data.shape)
            b_labels = np.zeros(labels.shape)
            oob_indexes = set(range(n_samples))
            indexes = set()

            for j in range(n_samples):
                sample_index = random.randint(0, n_samples-1)
                indexes.add(sample_index)
                b_data[j] = data.iloc[sample_index]
                b_labels[j] = labels.iloc[sample_index]
            oob_indexes -= indexes
            bootstrap.append((b_data, b_labels, oob_indexes))

        return bootstrap
    
    def get_subsample(self, len_sample):
        match self.n_features:
            case 'squared': 
                return random.sample(range(len_sample), k=int(np.sqrt(len_sample)))
            case 'regression':
                return random.sample(range(len_sample), k=len_sample // 3)
            case 'all':
                return range(len_sample)
        
    
    # Расчет критерия информативности
    def impurity_criterion(self, labels, criterion):
        
        match criterion:
            case 'gini':
                classes = Counter(labels)
                impurity = 1
                for label in classes:
                    p = classes[label] / len(labels)
                    impurity -= p ** 2   
            
            case 'entropy':
                classes = Counter(labels)
                impurity = 0
                for label in classes:
                    p = classes[label] / len(labels)
                    impurity -= p * np.log2(p) if p != 0 else 0
        
            case 'regression':
                impurity = ((labels - labels.mean())**2).sum() / labels.shape[0]
        
        return impurity
    
    # Расчет качества
    def quality(self, left_labels, right_labels, criterion, current_criterion):
        
        # доля выборки, ушедшая в левое поддерево
        p = left_labels.shape[0] / (left_labels.shape[0] + right_labels.shape[0])
        
        return current_criterion - p * self.impurity_criterion(left_labels, criterion) - (1 - p) * self.impurity_criterion(right_labels, criterion)
    
    # Разбиение датасета в узле
    def split(self, data, labels, index, t):
        
        left = np.where(data[:, index] <= t)
        right = np.where(data[:, index] > t)

        true_data = data[left]
        false_data = data[right]
        true_labels = labels[left]
        false_labels = labels[right]
        
        return true_data, false_data, true_labels, false_labels
    
    # Нахождение наилучшего разбиения
    def find_best_split(self, data, labels, criterion, min_leaf):
        
        current_criterion = self.impurity_criterion(labels, criterion)
        
        best_quality = 0
        best_t = None
        best_index = None
    
        n_features = data.shape[1]
    
        # выбор индекса из подвыборки длиной sqrt(n_features)
        subsample = self.get_subsample(n_features)
        
        for index in subsample:
            # будем проверять только уникальные значения признака, исключая повторения
            t_values = np.unique(data[:, index])

            for t in t_values:
                true_data, false_data, true_labels, false_labels = self.split(data, labels, index, t)
                
                #  пропускаем разбиения, в которых в узле остается менее min_leaf объектов
                if len(true_data) < min_leaf or len(false_data) < min_leaf:
                    continue
                    
                current_quality = self.quality(true_labels, false_labels, criterion, current_criterion)
                
                #  выбираем порог, на котором получается максимальный прирост качества
                if current_quality > best_quality:
                    best_quality, best_t, best_index = current_quality, t, index
        
        return best_quality, best_t, best_index
    
    # Построение дерева с помощью рекурсивной функции
    def build_tree(self, data, labels, criterion, min_leaf, max_depth):
        
        quality, t, index = self.find_best_split(data, labels, criterion, min_leaf)
    
        #  Базовый случай - прекращаем рекурсию, когда нет прироста качества
        #  или достигнут критерий останова

        if (max_depth == 0) or (quality == 0):
            return self.Leaf(data, labels, criterion)
        
        true_data, false_data, true_labels, false_labels = self.split(data, labels, index, t)
        
        # Рекурсивно строим два поддерева, обращая внимание на критерий останова
        true_branch = self.build_tree(true_data, true_labels, criterion, min_leaf, max_depth=max_depth-1)
        false_branch = self.build_tree(false_data, false_labels, criterion, min_leaf, max_depth=max_depth-1)
        
        # Возвращаем класс узла со всеми поддеревьями, то есть целого дерева
        return self.Node(index, t, true_branch, false_branch)
    
    def random_forest(self, data, labels, n_trees, criterion, min_leaf, max_depth):
        
        forest = []
        bootstrap = self.get_bootstrap(data, labels, n_trees)

        for b_data, b_labels, oob_indexes in bootstrap:
            forest.append((self.build_tree(b_data, b_labels, criterion, min_leaf, max_depth), oob_indexes))

        return forest

    def fit(self, X, y):
        self.forest = self.random_forest(X, y, n_trees=self.n_trees, criterion=self.criterion, min_leaf=self.min_leaf, max_depth=self.max_depth)
    
    def predict_object(self, obj, node):

        #  Останавливаем рекурсию, если достигли листа
        if isinstance(node, self.Leaf):
            answer = node.prediction
            return answer

        if obj[node.index] <= node.t:
            return self.predict_object(obj, node.true_branch)
        else:
            return self.predict_object(obj, node.false_branch)
    
    # функция формирования предсказания по выборке на одном дереве
    def predict_tree(self, data, tree):
        
        if self.criterion == 'regression':
            return [self.predict_object(data.iloc[i, :], tree) for i in range(data.shape[0])]
        
        else:
            classes = []
            for obj in data:
                prediction = self.predict_object(obj, tree)
                classes.append(prediction)
            return classes
    
    # предсказание голосованием деревьев
    def predict(self, data, probas=False):

        # добавим предсказания всех деревьев в список
        predictions = []
        for tree, _ in self.forest:
            predictions.append(self.predict_tree(data, tree))

        # сформируем список с предсказаниями для каждого объекта
        predictions_per_object = list(zip(*predictions))

        # выберем в качестве итогового предсказания для каждого объекта то,
        # за которое проголосовало большинство деревьев, либо, в случае
        # регрессии, среднее значение предсказаний
        
        if probas or self.criterion == 'regression':
            return [np.mean(obj) for obj in predictions_per_object]
        
        else:
            voted_predictions = []
            for obj in predictions_per_object:
                voted_predictions.append(max(set(obj), key=obj.count))

            return voted_predictions
    
    def out_of_bag(self):
        
        logloss = []
        
        for idx, obj in enumerate(self.data):
            answers = []
            for tree, oob_indexes in self.forest:
                if idx in oob_indexes:
                    answer = self.predict_tree([obj], tree)
                    answers.append(answer)
            
            if answers:
                p = np.array(answers).mean()
                logloss.append(calc_logloss(self.labels[idx], p))
        
        return np.array(logloss).mean()