The knowledge of Decision Trees will be assumed.
Random Forests is an attempt to fix the variance issue of decision trees \
and also use more than one decision tree to make better decisions.
Training:

1. Bootstrapping: randomly split data into \
   n times (n_estimators which would a hyperparameter) with replacement to make \
   sure each one has the same size as the original data set.
2. Decide on the max features (max_features is a hyperparameter) using an algorithm \
   like sqrt or log2. In this case, we'll be using sqrt
3. Randomly take the subset of features for each tree (the number of features is \
   random up to max features)
4. Run the decision tree algorithm and take the majority vote \
   (classification) or the average (regression) for each tree prediction

Testing:
Same thing as running Decision trees for entire dataset but take average or majority vote \
and find the accuracy.


In [3]:
from __future__ import annotations
import numpy as np
from math import sqrt
import random
from typing import List
import pandas as pd
from sklearn.model_selection import train_test_split
from csv import reader

# dataset = list()
# with open("./sonar.all-data.csv", "r") as file:
#     csv_reader = reader(file)
#     for row in csv_reader:
#         if not row:
#             continue
#         dataset.append(row)
# dataset = np.array(dataset)
# X, y = dataset[:, :-1], pd.Categorical(pd.Series(dataset[:, -1])).codes
# df = pd.read_csv("./sonar.all-data.csv")
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.33, random_state=42
# )
# X_train.shape, y_train.shape, X_test.shape, y_test.shape

In [4]:
from sklearn.datasets import load_iris, load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# # Load the Iris dataset
# iris = load_iris()
# X = iris.data
# y = iris.target

# Load the Breast Cancer Wisconsin (Diagnostic) dataset
data = load_breast_cancer()
X = data.data
y = data.target


# Split the dataset into training and testing sets


X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

In [5]:
X_train

array([[9.029e+00, 1.733e+01, 5.879e+01, ..., 1.750e-01, 4.228e-01,
        1.175e-01],
       [2.109e+01, 2.657e+01, 1.427e+02, ..., 2.903e-01, 4.098e-01,
        1.284e-01],
       [9.173e+00, 1.386e+01, 5.920e+01, ..., 5.087e-02, 3.282e-01,
        8.490e-02],
       ...,
       [1.429e+01, 1.682e+01, 9.030e+01, ..., 3.333e-02, 2.458e-01,
        6.120e-02],
       [1.398e+01, 1.962e+01, 9.112e+01, ..., 1.827e-01, 3.179e-01,
        1.055e-01],
       [1.218e+01, 2.052e+01, 7.722e+01, ..., 7.431e-02, 2.694e-01,
        6.878e-02]])

In [6]:
class BinaryTree:
    def __init__(
        self,
        val: float | None = None,
        pos: int | None = None,
        majority_class: int | None = None,
        left: BinaryTree | None = None,
        right: BinaryTree | None = None,
    ):
        self.val = val
        self.pos = pos
        self.left = left
        self.right = right
        self.majority_class = majority_class

    def display(self):
        lines, *_ = self._display_aux()
        for line in lines:
            print(line)

    def _display_aux(self):
        """Returns list of strings, width, height, and horizontal coordinate of the root."""
        # No child.
        if self.right is None and self.left is None:
            line = "%s" % self.pos
            width = len(line)
            height = 1
            middle = width // 2
            return [line], width, height, middle

        # Only left child.
        if self.right is None:
            lines, n, p, x = self.left._display_aux()
            s = "%s" % self.pos
            u = len(s)
            first_line = (x + 1) * " " + (n - x - 1) * "_" + s
            second_line = x * " " + "/" + (n - x - 1 + u) * " "
            shifted_lines = [line + u * " " for line in lines]
            return [first_line, second_line] + shifted_lines, n + u, p + 2, n + u // 2

        # Only right child.
        if self.left is None:
            lines, n, p, x = self.right._display_aux()
            s = "%s" % self.pos
            u = len(s)
            first_line = s + x * "_" + (n - x) * " "
            second_line = (u + x) * " " + "\\" + (n - x - 1) * " "
            shifted_lines = [u * " " + line for line in lines]
            return [first_line, second_line] + shifted_lines, n + u, p + 2, u // 2

        # Two children.
        left, n, p, x = self.left._display_aux()
        right, m, q, y = self.right._display_aux()
        s = "%s" % self.pos
        u = len(s)
        first_line = (x + 1) * " " + (n - x - 1) * "_" + s + y * "_" + (m - y) * " "
        second_line = (
            x * " " + "/" + (n - x - 1 + u + y) * " " + "\\" + (m - y - 1) * " "
        )
        if p < q:
            left += [n * " "] * (q - p)
        elif q < p:
            right += [m * " "] * (p - q)
        zipped_lines = zip(left, right)
        lines = [first_line, second_line] + [a + u * " " + b for a, b in zipped_lines]
        return lines, n + m + u, max(p, q) + 2, n + u // 2

    def print(self):
        if not self:
            return
        print(self.val, self.majority_class)
        if self.left:
            self.left.print()
        if self.right:
            self.right.print()

In [16]:
class DecisionTree:
    def __init__(self, max_tree_depth=5):
        self.max_tree_depth = max_tree_depth

    def _entropy(self, probabilities):
        epsilon = 1e-10
        return -np.sum((probabilities + epsilon) * np.log2(probabilities + epsilon))

    def _information_gain(self, parent_probabilities, children_probabilities, weights):
        return self._entropy(parent_probabilities) - np.sum(
            [
                weight * self._entropy(child_probabilities)
                for weight, child_probabilities in zip(weights, children_probabilities)
            ]
        )

    def _gain_split(self, value, index, dataset):
        left_dataset = dataset[dataset[:, index] <= value]
        right_dataset = dataset[dataset[:, index] > value]
        return left_dataset, right_dataset

    def _get_best_split(
        self,
        parent_dataset: np.ndarray,
        parent_dataset_size: int,
        parent_probabilities: np.ndarray,
        class_size: int,
        counter=0,
    ):
        max_ig = float("-inf")
        for row in parent_dataset:
            for i in np.random.choice(
                range(len(row) - 2),
                size=random.randint(1, int(sqrt((len(row) - 1)))),
                replace=False,
            ):
                left_dataset, right_dataset = self._gain_split(
                    row[i], i, parent_dataset
                )
                left_size = len(left_dataset)
                right_size = len(right_dataset)
                total_groups_count = left_size + right_size

                probabilities_left = (
                    np.bincount(left_dataset[:, -1].astype(int), minlength=class_size)
                    / total_groups_count
                )
                probabilities_right = (
                    np.bincount(right_dataset[:, -1].astype(int), minlength=class_size)
                    / total_groups_count
                )

                ig = self._information_gain(
                    parent_probabilities,
                    np.array([probabilities_left, probabilities_right]),
                    np.array(
                        [
                            left_size / parent_dataset_size,
                            right_size / parent_dataset_size,
                        ]
                    ),
                )
                if max_ig < ig:
                    max_ig = ig
                    optimal_value = row[i]
                    index = i
                    optimal_left_dataset = left_dataset
                    optimal_right_dataset = right_dataset
                    optimal_left_size = left_size
                    optimal_right_size = right_size
                    optimal_probabilities_left = probabilities_left
                    optimal_probabilities_right = probabilities_right
        if parent_dataset is not None and not parent_dataset.size:
            return None
        root = BinaryTree(optimal_value, index)
        if counter < self.max_tree_depth:
            root.left = self._get_best_split(
                optimal_left_dataset,
                optimal_left_size,
                optimal_probabilities_left,
                class_size,
                counter + 1,
            )
            root.right = self._get_best_split(
                optimal_right_dataset,
                optimal_right_size,
                optimal_probabilities_right,
                class_size,
                counter + 1,
            )
        # add majority_class to leaf node
        if not root.left and not root.right:
            root.majority_class = np.argmax(
                np.bincount(parent_dataset[:, -1].astype(int))
            )
        return root

    def fit(self, X: np.ndarray, class_values: np.ndarray):
        class_size = len(np.unique(class_values))
        self.class_values = class_values
        dataset = np.concatenate((X, class_values.reshape((-1, 1))), axis=1)
        dataset_size = len(dataset)
        self.row_size = len(dataset[0])
        decision_tree = self._get_best_split(
            dataset, dataset_size, np.bincount(class_values) / dataset_size, class_size
        )
        # decision_tree.display()
        # decision_tree.print()
        self.decision_tree = decision_tree

    def _traverse_tree(self, X: np.ndarray, root: BinaryTree):
        if not root:
            return
        if not root.left and not root.right:
            X_set = set([tuple(x) for x in X])
            for i, x in enumerate(self.X):
                if tuple(x) in X_set:
                    self.results[i] = root.majority_class
            return
        left_dataset, right_dataset = self._gain_split(root.val, root.pos, X)
        self._traverse_tree(left_dataset, root.left)
        self._traverse_tree(right_dataset, root.right)

    def predict(self, X: np.ndarray):
        self.results = [0] * len(X)
        self.X = X
        self._traverse_tree(X, self.decision_tree)
        return np.array(self.results)

SyntaxError: invalid syntax (3342573055.py, line 32)

In [8]:
class RandomForest:
    def __init__(self, max_tree_depth=5, n_estimators=100, max_features="sqrt"):
        self.max_tree_depth = max_tree_depth
        self.n_estimators = n_estimators
        self.max_features = max_features
        self.forest: List[DecisionTree] = []

    def _bootstrap(self, dataset: np.ndarray):
        size = len(dataset)
        random_indices = np.random.choice(range(size), size=size, replace=True)
        return dataset[random_indices]

    def _get_random_features_positions(self, row_size: int, num_features: int):
        return list(np.random.choice(row_size, size=num_features, replace=False))

    def fit(self, X: np.ndarray, class_values: np.ndarray):
        row_size = len(X[0])
        if self.max_features == "sqrt":
            self.max_features = int(sqrt(row_size))
        dataset = np.concatenate((X, class_values.reshape((-1, 1))), axis=1)
        for _ in range(self.n_estimators):
            filtered_dataset = self._bootstrap(dataset)
            new_X = filtered_dataset[:, :-1]
            new_class_values = filtered_dataset[:, -1].astype(int)
            tree = DecisionTree(self.max_tree_depth)
            tree.fit(new_X, new_class_values)
            self.forest.append(tree)

    def predict(self, X: np.ndarray):
        predictions = []
        for tree in self.forest:
            prediction = tree.predict(X)
            predictions.append(prediction)
        predictions = np.array(predictions)
        results = []
        # save(predictions)
        for i in range(len(predictions[0])):
            results.append(np.argmax(np.bincount(predictions[:, i])))

        return np.array(results)


def save(predictions_list: List[List]):
    file_path = "predictions.txt"
    with open(file_path, "w") as file:
        for sublist in predictions_list:
            line = " ".join(str(element) for element in sublist)
            file.write(line + "\n")

In [17]:
model = DecisionTree(max_tree_depth=5)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(y_pred)
accuracy

[1 0 0 1 1 0 0 0 0 1 1 0 1 0 1 0 1 1 1 0 1 1 0 1 1 1 1 1 1 0 1 1 1 1 1 1 1
 1 0 1 1 0 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 0 0 1 1 0 0 1 1 1 0 0 1 1 0 0 1 0
 1 1 1 1 1 1 0 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 0 0 1 0 0 1 0 0 1 1 1 0 0 1 0
 1 1 0]


0.9473684210526315

In [19]:
from sklearn.metrics import accuracy_score, precision_score

model = RandomForest(max_tree_depth=5, n_estimators=100)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(y_pred)
print(y_test.shape, y_pred.shape)
# print("Percision:", accuracy)
print("Accuracy:", accuracy)

In [15]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Create a Random Forest classifier
rf_classifier = RandomForestClassifier(
    n_estimators=100,
    criterion="entropy",
    max_depth=5,
    max_features="sqrt",
    bootstrap=True,
)

# Train the Random Forest classifier
rf_classifier.fit(X_train, y_train)

# Make predictions on the testing set
y_pred = rf_classifier.predict(X_test)
print(y_pred)
print(X_test.shape)
# Calculate accuracy
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)

[1 0 0 1 1 0 0 0 0 1 1 0 1 0 1 0 1 1 1 0 1 1 0 1 1 1 1 1 1 0 1 1 1 1 1 1 0
 1 0 1 1 0 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 0 0 1 1 0 0 1 1 1 0 0 1 1 0 0 1 0
 1 1 1 1 1 1 0 1 1 0 0 0 0 0 1 1 1 1 1 1 1 1 0 0 1 0 0 1 0 0 1 1 1 0 1 1 0
 1 1 0]
(114, 30)
Accuracy: 0.9649122807017544
