In [1]:
import math
import numpy as np
import pandas as pd

from collections import Counter, deque
from numbers import Number

from sklearn.datasets import load_wine
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier

In [2]:
def _entropy(y):
    # https://en.wikipedia.org/wiki/Entropy_(information_theory)
    result = 0
    for category, count in Counter(y).items():
        probability = count / len(y)
        result -= probability * math.log(probability, 2)
    return result


def _info_gain(x, y, x_is_continuous):
    # https://machinelearningmastery.com/information-gain-and-mutual-information/
    starting_entropy = _entropy(y)
    n = len(x)
    tuples = sorted([(x_i, y_i) for x_i, y_i in zip(x, y)], key=lambda tup: tup[0])
    sorted_x, sorted_y = [tup[0] for tup in tuples], [tup[1] for tup in tuples]
    best_gain, best_split_value = 0, None
    for i in range(1, n):
        if sorted_x[i] != sorted_x[i - 1]:
            entropy_left, entropy_right = _entropy(sorted_y[:i]), _entropy(sorted_y[i:])
            gain = starting_entropy - (entropy_left * i / n + entropy_right * (1 - i / n))
            if gain >= best_gain:
                best_gain = gain
                best_split_value = (sorted_x[i - 1] + sorted_x[i]) / 2 if x_is_continuous else x[i - 1]
    return best_gain, best_split_value


def max_info_gain(X: pd.DataFrame, y: pd.Series):
    continuous_columns = set(X.select_dtypes(include=np.number))
    best_column, best_gain, best_split_value = None, 0, None
    for column in X.columns:
        gain, split_value = _info_gain(X[column], y, column in continuous_columns)
        if gain > best_gain:
            best_column = column
            best_gain = gain
            best_split_value = split_value
    return best_column, best_gain, best_split_value

In [3]:
class DTNode:
    def __init__(self, rows):
        self.rows = rows

        self.column_name = None
        self.is_continuous = None
        self.split_val = None
        self.left_node = None
        self.right_node = None

    def split(self, X_train: pd.DataFrame, y_train: pd.Series, info_gain_threshold):
        X_train_new, y_train_new = X_train.loc[self.rows], y_train.loc[self.rows]
        column_name, info_gain, split_val = max_info_gain(X_train_new, y_train_new)
        if info_gain < info_gain_threshold:
            return None, None
        self.column_name = column_name
        self.is_continuous = isinstance(split_val, Number)
        self.split_val = split_val
        if self.is_continuous:
            indices_left = X_train_new.index[X_train_new[column_name] < split_val]
            indices_right = X_train_new.index[X_train_new[column_name] >= split_val]
        else:
            indices_left = X_train_new.index[X_train_new[column_name] == split_val]
            indices_right = X_train_new.index[X_train_new[column_name] != split_val]
        self.left_node = DTNode(indices_left)
        self.right_node = DTNode(indices_right)
        return self.left_node, self.right_node


class ID3Classifier:
    def __init__(self, info_gain_threshold):
        self.root = None
        self.X_train = None
        self.y_train = None
        self.info_gain_threshold = info_gain_threshold

    def fit(self, X_train: pd.DataFrame, y_train: pd.Series):
        self.root = DTNode(X_train.index)
        self.X_train = X_train
        self.y_train = y_train
        queue = deque([self.root])
        while len(queue) > 0:
            node = queue.popleft()
            node.split(X_train, y_train, self.info_gain_threshold)
            if node.left_node is not None:
                queue.append(node.left_node)
            if node.right_node is not None:
                queue.append(node.right_node)

    def predict(self, X_test: pd.DataFrame):
        def _classify_row(row):
            curr_node = self.root
            while curr_node.left_node is not None and curr_node.right_node is not None:
                val = row[curr_node.column_name]
                if curr_node.is_continuous:
                    curr_node = curr_node.left_node if val < curr_node.split_val else curr_node.right_node
                else:
                    curr_node = curr_node.left_node if val == curr_node.split_val else curr_node.right_node
            items = self.y_train[curr_node.rows].tolist()
            return max(items, key=items.count)

        return X_test.apply(_classify_row, axis=1)

In [8]:
X, y = load_wine(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y)

sklearn_model = DecisionTreeClassifier(criterion="entropy")
sklearn_model.fit(X_train, y_train)
print("sklearn accuracy: %.2f" % sklearn_model.score(X_test, y_test))

my_model = ID3Classifier(0.1)
my_model.fit(X_train, y_train)
print("my accuracy: %.2f" % accuracy_score(y_test, my_model.predict(X_test)))

sklearn accuracy: 0.96
my accuracy: 0.96
