In [14]:
## imports
import numpy as np
import pandas as pd
import random
import time
from termcolor import colored
import itertools

## helper functions for the tree
from helpers import *

def make_seed(seed: int) -> None:
    """
    Sets the seed for the random number generator.
    """
    random.seed(seed)
    np.random.seed(seed)


def make_standard_sample(X_train: np.array, y_train: np.array) -> np.array:
    xboot, yboot = (
        X_train.values,
        y_train,
    )
    return xboot, yboot


def split_multiple_nodes(
    xboot: np.array, yboot: np.array, top_features: int
) -> dict:
    """ """
    all_features = []
    num_features = len(xboot[0])
    while len(all_features) <= top_features:
        fidx = random.sample(range(num_features), 1)
        if fidx not in all_features:
            all_features.extend(fidx)
    top_score = -9.99 * 1000
    curr_node = None
    for feature_idx in all_features:
        for splitter in xboot[:, feature_idx]:
            lc = {"xboot": [], "yboot": []}
            rc = {"xboot": [], "yboot": []}

            for i, value in enumerate(xboot[:, feature_idx]):
                if value <= splitter:
                    lc["xboot"].append(xboot[i])
                    lc["yboot"].append(yboot[i])
                else:
                    rc["xboot"].append(xboot[i])
                    rc["yboot"].append(yboot[i])

            curr_info_gain = information_gain(
                lc["yboot"], rc["yboot"]
            )
            if curr_info_gain > top_score:
                top_score = curr_info_gain
                lc["xboot"] = np.array(lc["xboot"])
                rc["xboot"] = np.array(rc["xboot"])
                curr_node = {
                    "information_gain": curr_info_gain,
                    "left_child": lc,
                    "right_child": rc,
                    "split_point": splitter,
                    "feature_idx": feature_idx,
                }

    return curr_node


def leaf(node: dict) -> int:
    """
    Returns a prediction of a class for a node.
    """
    return max(node["yboot"], key=node["yboot"].count)


def split_single_node(
    node: dict, top_features: int, min_obs: int, tiefe: int, depth: int
) -> None:

    """
    Checks if the node is splitted or not.

    PARAMS:
    -------
    node : the node to check if it is splitted or not

    top_features : the number of features to consider when doing the splits

    min_obs : the minimum number of samples to split a node

    tiefe : the maximum depth to split a node

    depth : the current depth of the node

    RETURNS:
    --------
    None : splits node of the tree

    """
    lc, rc = node["left_child"], node["right_child"]

    del node["left_child"]
    del node["right_child"]
    
    if len(lc["yboot"]) == 0 or len(rc["yboot"]) == 0:
        empty_child = {"yboot": lc["yboot"] + rc["yboot"]}
        node["left_split"] = leaf(empty_child)
        node["right_split"] = leaf(empty_child)
        return

    if depth >= tiefe:
        node["left_split"] = leaf(lc)
        node["right_split"] = leaf(rc)
        return node

    if len(lc["xboot"]) <= min_obs:
        node["left_split"] = node["right_split"] = leaf(lc)
    else:
        node["left_split"] = split_multiple_nodes(
            lc["xboot"], lc["yboot"], top_features
        )
        split_single_node(
            node["left_split"], tiefe, min_obs, tiefe, depth + 1
        )
    if len(rc["xboot"]) <= min_obs:
        node["right_split"] = node["left_split"] = leaf(rc)
    else:
        node["right_split"] = split_multiple_nodes(
            rc["xboot"], rc["yboot"], top_features
        )
        split_single_node(
            node["right_split"], top_features, min_obs, tiefe, depth + 1
        )


def grow_a_tree(
    xboot: np.array,
    yboot: np.array,
    tiefe: int,
    min_obs: int,
    top_features: int,
):
    base_ = split_multiple_nodes(xboot, yboot, top_features)
    split_single_node(base_, top_features, min_obs, tiefe, 1)
    return base_


def rf_model(
    X_train: np.array,
    y_train: np.array,
    n_estimators: int,
    top_features: int,
    tiefe: int = 10,
    min_obs: int = 2,
):
    all_trees = [None] * n_estimators
    for i in range(n_estimators):
        xboot, yboot = make_standard_sample(X_train, y_train)
        tree = grow_a_tree(
            xboot, yboot, top_features, tiefe, min_obs
        )
        all_trees[i] = tree
    return all_trees


def make_prediction(tree, X_test):
    fidx = tree["feature_idx"]

    if X_test[fidx] <= tree["split_point"]:
        if type(tree["left_split"]) == dict:
            return make_prediction(tree["left_split"], X_test)
        else:
            value = tree["left_split"]
            return value
    else:
        if type(tree["right_split"]) == dict:
            return make_prediction(tree["right_split"], X_test)
        else:
            return tree["right_split"]


def predictor(all_trees: list, X_test: np.array) -> np.array:
    rf_predictions = []
    for observation in range(len(X_test)):
        all_preds = [
            make_prediction(tree, X_test.values[observation]) for tree in all_trees
        ]
        final_pred = max(all_preds, key=all_preds.count)
        rf_predictions.append(final_pred)
    return np.array(rf_predictions)

In [15]:
SEED = 42
make_seed(SEED)
def runif(M):
    return np.random.randint(low=1,high=M+1)
# parser = argparse.ArgumentParser()
# parser.add_argument("-ds",type=str, help="dataset")
# args = parser.parse_args()
## include the argparse
DATA_SET_SIZE = 'small'
X_train, y_train, X_test, y_test = prepare_dataset(DATA_SET_SIZE)
## NAME FOR THE FILE 
NAME = "Decision_Forest"
## NUMBER OF TREES 
NUM_TREES = [1, 10, 25, 50, 75, 100]
## NUMBER OF FEATURES 
M = len(X_train)
F = [int(M/4), int(M/2), int(M*3/4), runif(M)]
## make combinations of all the parameters
combs = list(itertools.product(NUM_TREES, F))
## run the model for each combination
holders = []
for nt, f in combs:
    ips = [(X_train, y_train, nt, f)]
    tic = time.time()
    model = rf_model(
        X_train,
        y_train,
        n_estimators=nt,
        top_features=f,
        tiefe=10,
        min_obs=2
    )
    toc = time.time()
    tic_toc = toc - tic
    preds = predictor(model, X_test)
    acc = sum(preds == y_test) / len(y_test)
    ## print the results: Trees | Features | Accuracy | Time , in green
    print(
        colored(
            f"Model {NAME} | Trees {nt} | Number of Features {f} | Accuracy {acc*100:.2f} | Time {tic_toc:.2f}s",
            "green",
        )
    )
    ## save the results to a csv file
    model_df = pd.DataFrame.from_records(model)
    model_df.drop(["left_split", "right_split"], axis=1, inplace=True)
    ## add the columns for nt & f
    model_df["Num_trees"] = nt
    model_df["Num_features"] = f
    model_df["Accuracy"] = acc
    holders.append(model_df)
    ## combine the list of dataframes
combined_df = pd.concat(holders)
combined_df.to_csv(f"./Data/out/{DATA_SET_SIZE}_{NAME}.csv", index=False)


[32mModel Decision_Forest | Trees 1 | Number of Features 42 | Accuracy 34.88 | Time 2.64s[0m
[32mModel Decision_Forest | Trees 1 | Number of Features 85 | Accuracy 34.88 | Time 6.29s[0m
[32mModel Decision_Forest | Trees 1 | Number of Features 128 | Accuracy 37.21 | Time 14.48s[0m
[32mModel Decision_Forest | Trees 1 | Number of Features 103 | Accuracy 27.91 | Time 4.44s[0m
[32mModel Decision_Forest | Trees 10 | Number of Features 42 | Accuracy 30.23 | Time 23.89s[0m
[32mModel Decision_Forest | Trees 10 | Number of Features 85 | Accuracy 34.88 | Time 46.20s[0m
[32mModel Decision_Forest | Trees 10 | Number of Features 128 | Accuracy 27.91 | Time 72.23s[0m
[32mModel Decision_Forest | Trees 10 | Number of Features 103 | Accuracy 23.26 | Time 64.62s[0m
[32mModel Decision_Forest | Trees 25 | Number of Features 42 | Accuracy 27.91 | Time 92.64s[0m
[32mModel Decision_Forest | Trees 25 | Number of Features 85 | Accuracy 32.56 | Time 160.91s[0m
[32mModel Decision_Forest | Tre