Convex Optimization II - Final Project
Tina Halimi 400101078 - Heliya Shakeri 400101391


In [None]:
import numpy as np
import pandas as pd
import math
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import f1_score
import concurrent
from functools import partial
import datetime
import math

from pandas.api.types import is_numeric_dtype

from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# Internal Decision Tree

In [None]:
import numpy as np

class InternalDecisionTree:
    """
    A simple decision tree used internally.
    """

    def __init__(self, source_desc, classes_):
        self.source_desc = source_desc
        self.classes_ = classes_

        self.feature = None
        self.threshold = None
        self.children_left = None
        self.children_right = None
        self.node_prediction = None


    def copy_from_dt(self, dt):
        self.feature = dt.tree_.feature
        self.threshold = dt.tree_.threshold
        self.children_left = dt.tree_.children_left
        self.children_right = dt.tree_.children_right
        self.node_prediction = [dt.classes_[np.argmax(x)] for x in dt.tree_.value]

    def copy_from_values(self, feature, threshold, children_left, children_right):
        self.feature = feature
        self.threshold = threshold
        self.children_left = children_left
        self.children_right = children_right
        self.node_prediction = None


    def count_training_records(self, x, y):
        counts = np.zeros((len(self.feature), len(self.classes_)))

        for i in x.index:
            y_index = self.classes_.index(y.loc[i])
            row = x.loc[i]
            cur_node_idx = 0
            while self.feature[cur_node_idx] >= 0:
                cur_feature_name = x.columns[self.feature[cur_node_idx]]
                cur_threshold = self.threshold[cur_node_idx]
                if row[cur_feature_name] >= cur_threshold:
                    cur_node_idx = self.children_right[cur_node_idx]
                else:
                    cur_node_idx = self.children_left[cur_node_idx]
            counts[cur_node_idx][y_index] += 1

        # Prediction for each leaf node
        self.node_prediction = [""] * len(self.feature)
        for node_idx in range(len(self.feature)):
            if self.feature[node_idx] == -2:
                self.node_prediction[node_idx] = self.classes_[np.argmax(counts[node_idx])]


    def predict(self, x):
        ret = []
        for i in x.index:
            row = x.loc[i]
            cur_node_idx = 0
            while self.feature[cur_node_idx] >= 0:
                cur_feature_name = self.feature[cur_node_idx]
                cur_threshold = self.threshold[cur_node_idx]
                if row[cur_feature_name] >= cur_threshold:
                    cur_node_idx = self.children_right[cur_node_idx]
                else:
                    cur_node_idx = self.children_left[cur_node_idx]

            ret.append(self.node_prediction[cur_node_idx])
        return ret

# Genetic Decision Tree

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

class GeneticDecisionTree:

    def __init__(self,
                 max_depth=4,
                 max_iterations=5,
                 random_state=0):

        self.max_depth = max_depth
        self.max_iterations = max_iterations
        self.random_state = random_state

        self.column_names = None
        self.classes_ = None
        self.internal_dt = None


    def fit(self, x, y):
        """
        Create many candidate trees and optionally mutating and
        combining these to identify the tree best fit to the training data.
        """

        def get_dt_scores():
            return [f1_score(y, idt.predict(x), average='macro') for idt in idt_list]

        def combine_trees(parent_a, parent_b):

            new_tree = InternalDecisionTree(
                parent_a.source_desc + " & " + parent_b.source_desc + " Combined",
                self.classes_,
            )

            # New tree
            feature = [parent_a.feature[0]]
            threshold = [(parent_a.threshold[0] + parent_b.threshold[0]) / 2.0]

            start_right_tree_parent1 = parent_a.children_right[0]
            start_right_tree_parent2 = parent_b.children_right[0]
            feature.extend(parent_a.feature[1:start_right_tree_parent1])
            threshold.extend(parent_a.threshold[1:start_right_tree_parent1])
            children_left = list(parent_a.children_left[:start_right_tree_parent1].copy())
            children_right = list(parent_a.children_right[:start_right_tree_parent1].copy())

            offset = start_right_tree_parent2 - start_right_tree_parent1
            for node_idx in range(start_right_tree_parent2, len(parent_b.children_right)):
                feature.append(parent_b.feature[node_idx])
                threshold.append(parent_b.threshold[node_idx])
                children_left.append(parent_b.children_left[node_idx] - offset)
                children_right.append(parent_b.children_right[node_idx] - offset)

            new_tree.copy_from_values(feature, threshold, children_left, children_right)
            new_tree.count_training_records(x, y)
            idt_list.append(new_tree)


        x = pd.DataFrame(x)
        y = pd.Series(y)
        x = x.reset_index(drop=True)
        y = y.reset_index(drop=True)

        self.column_names = x.columns
        self.classes_ = sorted(y.unique())

        idt_list = []

        # Bootstrap samples of the data
        for i in range(10):
            x_sample = x.sample(n=len(x))
            y_sample = y_sample = y.iloc[x_sample.index]
            dt = DecisionTreeClassifier(max_depth=self.max_depth, random_state=self.random_state + i)
            dt.fit(x_sample, y_sample)
            idt = InternalDecisionTree("Original", self.classes_)
            idt.copy_from_dt(dt)
            idt_list.append(idt)

        idt_scores = get_dt_scores()

        for iteration_idx in range(self.max_iterations):
            top_scores_idxs = np.argsort(idt_scores)[::-1]

            # Mutate
            for i in top_scores_idxs[:10]:
                parent_idt = idt_list[i]
                ret_list = mutate_tree(parent_idt, x, y, self.classes_)
                idt_list.extend(ret_list)

            # Combine
            for i in range(len(top_scores_idxs)):
                for j in range(i + 1, len(top_scores_idxs)):
                    idt_idx_1 = top_scores_idxs[i]
                    idt_idx_2 = top_scores_idxs[j]
                    parent_idt_1 = idt_list[idt_idx_1]
                    parent_idt_2 = idt_list[idt_idx_2]

                    if parent_idt_1.feature[0] == parent_idt_2.feature[0]:
                        combine_trees(parent_idt_1, parent_idt_2)
                        combine_trees(parent_idt_2, parent_idt_1)

            # Generate more random trees
            num_random = 100 // (iteration_idx + 1)
            for i in range(num_random):
                sample_size_log = np.random.uniform(low=7, high=np.log2(len(x) * 2))
                sample_size = int(math.pow(2, sample_size_log))
                x_sample = x.sample(n=sample_size, replace=True)
                y_sample = y.iloc[x_sample.index]
                dt = DecisionTreeClassifier(
                    max_depth=self.max_depth,
                    random_state=self.random_state + iteration_idx + i
                )
                dt.fit(x_sample, y_sample)
                idt = InternalDecisionTree('Random', self.classes_)
                idt.copy_from_dt(dt)
                idt_list.append(idt)

            # Best 20 trees based on scores
            idt_scores = get_dt_scores()
            top_scores_idxs = np.argsort(idt_scores)[::-1][:20]
            idt_list = np.array(idt_list)[top_scores_idxs].tolist()
            idt_scores = np.array(idt_scores)[top_scores_idxs].tolist()

        top_scores_idxs = np.argsort(idt_scores)[::-1]
        idt_list = np.array(idt_list)[top_scores_idxs].tolist()
        self.internal_dt = idt_list[0]


def mutate_tree(parent_idt, x, y, classes_):
    idt_list = []
    for j in range(5): # nodes
        n_nodes_parent = len(parent_idt.feature)
        while True:
            node_idx = np.random.choice(n_nodes_parent)
            if parent_idt.feature[node_idx] >= 0:
                break
        feature_idx = parent_idt.feature[node_idx]

        for k in range(10):  # threshold
            idt = InternalDecisionTree(parent_idt.source_desc + " - Modified Threshold", classes_)
            new_threshold = parent_idt.threshold.copy()
            feat_name = x.columns[feature_idx]
            new_threshold[node_idx] = np.random.uniform(low=x[feat_name].min(), high=x[feat_name].max())

            idt.copy_from_values(
                feature=parent_idt.feature,
                threshold=new_threshold,
                children_left=parent_idt.children_left,
                children_right=parent_idt.children_right
            )
            idt.count_training_records(x, y)
            idt_list.append(idt)
    return idt_list

# Other Functions

In [None]:
def visualize_tree(genetic_dt):
    """
    Visualizes the trained GeneticDecisionTree by displaying its decision rules.

    :param genetic_dt: A trained GeneticDecisionTree instance.
    """
    internal_dt = genetic_dt.internal_dt  # Get the internal tree representation
    column_names = genetic_dt.column_names

    # Get the depth of each node
    depths_arr = [-1] * len(internal_dt.feature)
    depths_arr[0] = 0  # Root node starts at depth 0
    changed_any = True

    while changed_any:
        changed_any = False
        for i in range(len(depths_arr)):
            if depths_arr[i] > -1:
                left_child_idx = internal_dt.children_left[i]
                right_child_idx = internal_dt.children_right[i]
                if (left_child_idx > 0) and (depths_arr[left_child_idx] == -1):
                    depths_arr[left_child_idx] = depths_arr[i] + 1
                    changed_any = True
                if (right_child_idx > 0) and (depths_arr[right_child_idx] == -1):
                    depths_arr[right_child_idx] = depths_arr[i] + 1
                    changed_any = True

    # Generate tree structure
    lines_arr = []
    for cur_node_idx in range(len(internal_dt.feature)):
        indent = "|   " * depths_arr[cur_node_idx]  # Indentation for tree levels
        if internal_dt.feature[cur_node_idx] >= 0:
            feature = internal_dt.feature[cur_node_idx]
            threshold = internal_dt.threshold[cur_node_idx]
            lines_arr.append(f"{indent}IF {column_names[feature]} < {threshold:.2f}")
        else:
            lines_arr.append(f"{indent}THEN class = {internal_dt.node_prediction[cur_node_idx]}")

    # Create a map from the node ids to the line ids
    node_id_to_line_id_map = {x: x for x in range(len(internal_dt.feature))}

    # Create a line for the right side of each condition
    for cur_node_idx in range(len(internal_dt.feature) - 1, -1, -1):
        if internal_dt.feature[cur_node_idx] >= 0:
            indent = "|   " * depths_arr[cur_node_idx]
            feature = internal_dt.feature[cur_node_idx]
            threshold = internal_dt.threshold[cur_node_idx]
            node_idx = internal_dt.children_right[cur_node_idx]
            lines_idx = node_id_to_line_id_map[node_idx]
            lines_arr.insert(lines_idx, f"{indent}ELSE {column_names[feature]} > {threshold:.2f}")

            # Adjust line mapping for nodes after this insertion
            for i in range(node_idx, len(internal_dt.feature)):
                node_id_to_line_id_map[i] += 1

    # Print the tree structure
    for line in lines_arr:
        print(line)

# Example

In [None]:
# Binary Classification Problem
from sklearn.datasets import load_wine

np.random.seed(0)
data = load_wine()
df = pd.DataFrame(data.data)
df.columns = data.feature_names
y_true = data.target
#print(pd.Series(y_true).value_counts())

X_train, X_test, y_train, y_test = train_test_split(df, y_true, test_size=0.3, random_state=42)

In [None]:
# Standard Decision Tree
clf = DecisionTreeClassifier(max_depth=2)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print("DT:", f1_score(y_test, y_pred, average='macro'))

DT: 0.880759176148813


In [None]:
# Genetic Decision Tree
np.random.seed(0)
gdt = GeneticDecisionTree(max_depth=3, max_iterations=5)
gdt.fit(X_train, y_train)
y_pred = gdt.internal_dt.predict(X_test)
print("GDT:", f1_score(y_test, y_pred, average='macro'))

Genetic DT: 0.9799023830031581


In [None]:
visualize_tree(gdt)

IF flavanoids < 1.40
|   IF color_intensity < 3.72
|   |   THEN class = 1
|   ELSE color_intensity > 3.72
|   |   THEN class = 2
ELSE flavanoids > 1.40
|   IF proline < 724.50
|   |   THEN class = 1
|   ELSE proline > 724.50
|   |   THEN class = 0
