In [73]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import pandas as pd

# LAB 11 - Decision Trees for Classification

Today, you will implement decision trees for classification using the Gini index as the splitting criterion. You’ll build the tree recursively, selecting splits that minimize Gini impurity and classifying samples based on majority class in each leaf. A good dataset to start with is the Iris dataset, which is small, well-labeled, and available directly via `sklearn.datasets.load_iris().`

In [74]:
# Load the Iris dataset
data = load_iris()
X = data.data
y = data.target

In [75]:
y

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

We will focus only on binary classification today!

In [76]:
X = X[y != 2]
y = y[y != 2]

In [77]:
y

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

In [78]:
X = pd.DataFrame(X, columns=[i for i in range(X.shape[1])])
y = pd.Series(y)

In [79]:
X_train, X_split, t_train, t_split = train_test_split(X, y, train_size=.6)
X_validation, X_test, t_validation, t_test = train_test_split(X_split, t_split, train_size=.5)

## Please feel free to either use the code from last week as your structure, or any template you feel like works best for you!

In [80]:
def classification_criterion(region: pd.Series) -> float:
    """
    Implements the gini index criterion in a region
    
    Parameters
    ----------
    region : pd.Series
        Array of shape (N,) containing the values of the target values 
        for N datapoints in the training set.
    
    Returns
    -------
    float
        The sum of squared error
        
    Note
    ----
    The error for an empty region should be infinity (use: float("inf"))
    This avoids creating empty regions
    """
    if len(region) == 0:
        return float("inf")
    if isinstance(region, pd.Series):
        region = region.to_numpy()
    
    classes = np.unique(region)
    Gm = 0
    
    for k in classes:
        p_mk = len(region[region == k]) / len(region)
        Gm += p_mk * (1 - p_mk)

    return Gm

In [81]:
def split_region(region: pd.Series | pd.DataFrame, feature_index: int, tau: float):
    """
    Given a region, splits it based on the feature indicated by
    `feature_index`, the region will be split in two, where
    one side will contain all points with the feature with values 
    lower than `tau`, and the other split will contain the 
    remaining datapoints.
    
    Parameters
    ----------
    region : pd.Series | pd.DataFrame
        a partition of the dataset (or the full dataset) to be split
    feature_index : int
        the index of the feature (column of the region array) used to make this partition
    tau : float
        The threshold used to make this partition
        
    Return
    ------
    left_partition : pd.Series | pd.DataFrame
        indices of the datapoints in `region` where feature < `tau`
    right_partition : pd.Series | pd.DataFrame
        indices of the datapoints in `region` where feature >= `tau` 
    """
    left_partition = region[region[feature_index] < tau][feature_index].index
    right_partition = region[region[feature_index] >= tau][feature_index].index

    return left_partition, right_partition

In [82]:
def get_split(X: pd.DataFrame, y: pd.Series):
    """
    Given a dataset (full or partial), splits it on the feature of that minimizes the sum of squared error
    
    Parameters
    ----------
    X : pd.DataFrame
        features 
    y : pd.Series
        labels
    
    Returns
    -------
    decision : dictionary
        keys are:
        * 'feature_index' -> an integer that indicates the feature (column) of `X` on which the data is split
        * 'tau' -> the threshold used to make the split
        * 'left_region' -> array of indices where the `feature_index`th feature of X is lower than `tau`
        * 'right_region' -> indices not in `low_region`
    """
    best_tau_separations = []
    # List containing the best sse and tau registered for each feature, follows this structure

    for feature in X.columns:  # Going through every feature
        feature_gini = []
        # List containing all registered SSE for this features (It follows this structure:)
        # [ (tau_1, sse_1), (tau_2, sse_2), ..., (tau_j, sse_j) ]
        # Where 'tau_i' is the ith element of the feature's column and 'sse_i' is its registered SSE
        unique_features = pd.unique(X[feature])

        for tau in unique_features:  # Using each value of the feature as tau (For performance reasons)
            l_feature, r_feature = split_region(X, feature, tau)
            gini_left = classification_criterion(y[l_feature])
            gini_right = classification_criterion(y[r_feature])
            feature_gini.append((tau, gini_left + gini_right))

        minimum_tau_sse = min(feature_gini, key=lambda f_s: f_s[1])
        # Getting the minimum SSE on feature_sses
        best_tau_separations.append((feature, *minimum_tau_sse))
    
    best_separation_criterion = min(best_tau_separations, key=lambda s_c: s_c[2])

    l, r = split_region(X, best_separation_criterion[0], best_separation_criterion[1])

    return {
        'feature_index': best_separation_criterion[0],
        'tau': best_separation_criterion[1],
        'left_region': l,
        'right_region': r,
        'gini_index': best_separation_criterion[2]
    }

In [83]:
def recursive_growth(
    node: dict,
    min_samples: int,
    max_depth: int,
    current_depth: int,
    X: pd.DataFrame,
    y: pd.Series
):
    """
    Recursively grows a decision tree.
    
    Parameters
    ----------
    node : dictionary
        If the node is terminal, it contains only the "value" key, which determines the value to be used as a prediction.
        If the node is not terminal, the dictionary has the structure defined by `get_split`
    min_samples : int
        parameter for stopping criterion if a node has <= min_samples datapoints
    max_depth : int
        parameter for stopping criterion if a node belongs to this depth
    depth : int
        current distance from the root
    X : pd.DataFrame
        features (full dataset)
    y : pd.Series
        labels (full dataset)
    
    Notes
    -----
    To create a terminal node, a dictionary is created with a single "value" key, with a value that
    is the mean of the target variable
    
    'left' and 'right' keys are added to non-terminal nodes, which contain (possibly terminal) nodes 
    from higher levels of the tree:
    'left' corresponds to the 'left_region' key, and 'right' to the 'right_region' key
    """
    l_idx = node['left_region']
    r_idx = node['right_region']

    if                                                                                  \
        (len(X) <= min_samples)             \
        or                                                                              \
        (current_depth >= max_depth)                                                    \
    :
        return {
            "value": np.mean(y)
        }
    
    # X = X.drop(node['feature_index'], axis=1)
    splited_infos = get_split(X, y)
    l_idx = splited_infos['left_region']
    r_idx = splited_infos['right_region']
    
    return {
        'feature_index': splited_infos['feature_index'],
        'tau': splited_infos['tau'],
        'left': recursive_growth(splited_infos, min_samples, max_depth, current_depth+1, X.loc[l_idx], y.loc[l_idx]),
        'right': recursive_growth(splited_infos, min_samples, max_depth, current_depth+1, X.loc[r_idx], y.loc[r_idx])
    }

In [84]:
# you will test 3 values for min_samples_split: 2, 4, 6
# Remember that this sets the minimum number of samples required in a node to be eligible for splitting. 
# These values are good for small datasets like Iris, but you can try other values for larger datasets to not make the tree too deep.

max_depth = 6
min_samples_tests = [2, 4, 6]
trees = []

for sample in min_samples_tests:
    root = get_split(X_train, t_train)
    tree_root = recursive_growth(root, sample, max_depth, 0, X_train, t_train)
    trees.append(tree_root)

In [85]:
def print_tree(node, depth):
    if 'value' in node.keys():
        print('.  '*(depth-1), f"[{node['value']}]")
    else:
        print('.  '*depth, f'X_{node["feature_index"]} < {node["tau"]}')
        print_tree(node['left'], depth+1)
        print_tree(node['right'], depth+1)

In [86]:
print_tree(trees[0], 0)

 X_2 < 3.0
.   X_0 < 5.4
.  .   X_0 < 5.0
.  .  .   X_0 < 4.6
.  .  .   [0.0]
.  .  .  .   X_0 < 4.8
.  .  .  .  .   X_0 < 4.7
.  .  .  .  .   [0.0]
.  .  .  .  .   [0.0]
.  .  .  .  .   X_1 < 3.4
.  .  .  .  .   [0.0]
.  .  .  .  .   [0.0]
.  .  .   X_0 < 5.1
.  .  .  .   X_1 < 3.5
.  .  .  .   [0.0]
.  .  .  .   [0.0]
.  .  .  .   X_0 < 5.2
.  .  .  .  .   X_1 < 3.4
.  .  .  .  .   [0.0]
.  .  .  .  .   [0.0]
.  .  .  .   [0.0]
.  .   X_0 < 5.5
.  .  .   X_1 < 3.9
.  .  .   [0.0]
.  .  .   [0.0]
.  .  .   X_0 < 5.7
.  .  .   [0.0]
.  .  .   [0.0]
.   X_0 < 6.2
.  .   X_0 < 6.0
.  .  .   X_0 < 5.5
.  .  .  .   X_0 < 5.0
.  .  .  .   [1.0]
.  .  .  .  .   X_0 < 5.2
.  .  .  .  .   [1.0]
.  .  .  .  .   [1.0]
.  .  .  .   X_0 < 5.7
.  .  .  .  .   X_0 < 5.6
.  .  .  .  .   [1.0]
.  .  .  .  .   [1.0]
.  .  .  .  .   X_0 < 5.9
.  .  .  .  .   [1.0]
.  .  .  .  .   [1.0]
.  .  .   X_0 < 6.1
.  .  .  .   X_1 < 3.4
.  .  .  .  .   X_1 < 2.7
.  .  .  .  .   [1.0]
.  .  .  .  .   [1.0]
.  .  

Use accuracy to find the best split. Don't import it from sklearn, calculate it yourself, it's a one-liner ;)

In [87]:
def predict_sample(node: dict, sample: pd.Series):
    """
    Makes a prediction based on the decision tree defined by `node`
    
    Parameters
    ----------
    node : dictionary
        A node created one of the methods above
    sample : array of size (n_features,)
        a sample datapoint
    """
    if 'value' in node.keys():
        return node['value']
    else:
        if sample[node['feature_index']] < node['tau']:
            return predict_sample(node['left'], sample)
        else:
            return predict_sample(node['right'], sample)


def predict(node, X):
    """
    Makes a prediction based on the decision tree defined by `node`
    
    Parameters
    ----------
    node : dictionary
        A node created one of the methods above
    X : array of size (n_samples, n_features)
        n_samples predictions will be made
    """
    predicted_values = pd.Series(np.array([x for x in range(len(X))]))

    predicted_values = predicted_values.apply(lambda i: predict_sample(node, X.iloc[i]))

    return predicted_values

In [88]:
def get_precision(t: pd.Series, y: pd.Series) -> float:
    """Gets the precision using the passed parameters

    Args
    ----------
    t: pd.Series
        True labels
    y: pd.Series
        Predictions
    
    Returns
    ----------
    float: The precision of the given prediction
    """
    y = y.to_numpy()
    t = t.to_numpy()
    TP = len(y[
        (y == 1) & (t == 1  )
    ])
    FP = len(y[
        (y == 1) & (t == 0)
    ])
    if TP + FP == 0:
        return 0
    precision = TP / (TP + FP)
    return precision

In [93]:
# # Getting the best min sample
precisions = []
for tree_root in trees:
    y_validation = predict(tree_root, X_validation)
    precisions.append(get_precision(t_validation, y_validation))

# Checking if we don't have equal probabilities
last_value = None
all_equal = True
for value in precisions:
    if not last_value:
        last_value = value
        continue
    
    if value != last_value:
        all_equal = False

best_min_samples = 0

if all_equal:
    best_min_samples = min_samples_tests[-1]
else:
    min_precision = min(precisions)
    best_min_samples = min_samples_tests[precisions.index(min_precision)]

best_min_samples

6

In [102]:
new_dataset = pd.concat([X_train, X_validation], axis=0)
new_target = pd.concat([t_train, t_validation], axis=0)
root = get_split(new_dataset, new_target)
tree_root = recursive_growth(root, best_min_samples, max_depth, 0, new_dataset, new_target)

In [103]:
y_test = predict(tree_root, X_test)
get_precision(t_test, y_test)

1.0