A little decision tree implementation :)

In [1]:
import numpy as np

Originally I wrote this just to handle categorical/integer-valued data, but it can handle continuous data as well (it's just a lot slower). It needs a integer-valued target, though. 

In [2]:
np.random.seed(64)
n_rows = 90
seq1 = np.array([i % 5 for i in range(0, n_rows)])
seq2 = np.array([i % 4 for i in range(0, n_rows)])
seq3 = np.array([i % 7 for i in range(0, n_rows)])
seq4 = np.array([i % 2 for i in range(0, n_rows)])
# seq5 = np.array(np.sin(seq4) + np.cos(seq3) + np.random.random(n_rows))
data = np.array(
    [
        seq1,
        seq2,
        seq3,
        seq4,
        # seq5,
    ]
).T
# target = np.array([1 if (i % 3 == 0) else 0 for i in range(0, 50)])
# target = np.array([i % 3 for i in range(0, n_rows)])
target = np.array([0] * int((2*n_rows)/3) + [1] * int(n_rows/3))

In [3]:
def gini(target_vec):
    """Get the Gini impurity for a vector"""
    # feature vec needs to be filtered to only a certain value in the feature vector 
    # in the function where this is called
    # target vector needs to be filtered to correct indices
    target_unique = np.unique(target_vec)
    p_is = np.zeros((len(target_unique),))
    for i in range(len(target_unique)):
        p_i = sum(target_vec == target_unique[i]) / len(target_vec)
        p_is[i] = p_i
    gini_info = 1 - sum(p_is**2)

    return gini_info


def split_decider(data_array, target_vec, verbose=False):
    """Based on Gini impurity, decide which feature/value to split on."""
    min_impurity = np.inf
    split_feature_index = 0
    split_feature_val = 0
    for col in range(data_array.shape[1]):
        feature = data_array[:, col]
        feature_classes = np.unique(feature)
        for j in range(len(feature_classes)):
            feature_val = feature_classes[j]
            # calculate gini impurity for every potential split
            # (impurity of target vector filtered to indices where feature_vec == feature_val)
            gini_impurity = gini(target_vec[feature == feature_val])
            if verbose:
               print(f'feature: {col}, class: {feature_val}, gini: {gini_impurity}')
            if gini_impurity < min_impurity:
                min_impurity = gini_impurity
                split_feature_index = col
                split_feature_val = feature_classes[j]

    return {'feature': split_feature_index, 'value': split_feature_val}

split_decider(data, target)

{'feature': 2, 'value': np.int64(0)}

This needs a little more validation. But upon preliminary inspection it looks like it's alright.

In [4]:
def decision_tree(data_array, target_vec):
    # take in a data matrix and target array
    # get feature to split on + value to split on
    # split dataset into two (data_array[feature == split_val] & data_array[feature != split_val])
    # check if recursion break condition is met - maybe like gini is above a certain threshold?
    # call decision tree function again to deepen tree
    instructions = split_decider(data_array, target_vec)
    print(f'splitting on {instructions["feature"]}')
    true_mask = data_array[:,instructions['feature']] == instructions['value']
    false_mask = data_array[:,instructions['feature']] != instructions['value']
    mat1 = data_array[true_mask, :]
    mat2 = data_array[false_mask, :]
 
    # prints updates out so you can see how the tree deepens
    for i in np.unique(target_vec[true_mask]):
        print(f'(t) {sum(target_vec[true_mask] == i)} {i}s in node (gini = {gini(target_vec[true_mask])})')
    for i in np.unique(target_vec[false_mask]):
        print(f'(f) {sum(target_vec[false_mask] == i)} {i}s in other node (gini = {gini(target_vec[false_mask])})')
    
    # break condition(s)
    if ((gini(target_vec[true_mask]) <= .2) and (gini(target_vec[false_mask]) <= .2)) or \
        ((len(true_mask) <= (.2 * data_array.shape[0])) and (len(false_mask) <= (.2 * data_array.shape[0]))):
        print('done')
        return
    
    
    if ((gini(target_vec[true_mask]) <= .2) or (len(true_mask) <= (.2 * data_array.shape[0]))) and \
    (gini(target_vec[false_mask]) >= .2) or (len(false_mask) >= (.2 * data_array.shape[0])):
        decision_tree(mat2, target_vec[false_mask])

    elif ((gini(target_vec[false_mask]) <= .2) or (len(false_mask) <= (.2 * data_array.shape[0]))) and \
        ((gini(target_vec[true_mask]) >= .2) or (len(true_mask) >= (.2 * data_array.shape[0]))):
        decision_tree(mat1, target_vec[true_mask])
    else:  # dead code for some of the test cases but i left it in here just in case
        decision_tree(mat1, target_vec[true_mask])
        decision_tree(mat2, target_vec[false_mask])
    
decision_tree(data, target)

splitting on 2
(t) 9 0s in node (gini = 0.42603550295857995)
(t) 4 1s in node (gini = 0.42603550295857995)
(f) 51 0s in other node (gini = 0.4472929667734863)
(f) 26 1s in other node (gini = 0.4472929667734863)
splitting on 2
(t) 9 0s in node (gini = 0.42603550295857995)
(t) 4 1s in node (gini = 0.42603550295857995)
(f) 42 0s in other node (gini = 0.451171875)
(f) 22 1s in other node (gini = 0.451171875)
splitting on 0
(t) 9 0s in node (gini = 0.42603550295857995)
(t) 4 1s in node (gini = 0.42603550295857995)
(f) 33 0s in other node (gini = 0.45674740484429055)
(f) 18 1s in other node (gini = 0.45674740484429055)
splitting on 2
(t) 7 0s in node (gini = 0.42000000000000004)
(t) 3 1s in node (gini = 0.42000000000000004)
(f) 26 0s in other node (gini = 0.46400951814396196)
(f) 15 1s in other node (gini = 0.46400951814396196)
splitting on 1
(t) 7 0s in node (gini = 0.42000000000000004)
(t) 3 1s in node (gini = 0.42000000000000004)
(f) 19 0s in other node (gini = 0.47450572320499484)
(f) 12