In [23]:
import numpy as np
import pandas as pd
from graphviz import Digraph
import math
import random

col_names = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'type']
original_data = pd.read_csv(r"./dataset/Iris.csv", skiprows=1, header=None, names=col_names)

# col_names = ['Holiday', 'Discount', 'Free Delivery', 'Purchase']
# original_data = pd.read_csv(r"./dataset/Purchase_new.csv", skiprows=1, header=None, names=col_names)

data = original_data.copy()
data = data[data['type'] != 'Iris-setosa']
data = pd.concat([data,original_data[original_data['type'] == 'Iris-setosa'].sample(frac=1/3,random_state=42)])
data.shape

(117, 5)

In [24]:
counter = 0

class Node:
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.info_gain = info_gain
        self.value = value

class DecisionTreeClassifier:
    def __init__(self, min_samples_split=2, max_depth=2):
        self.root = None
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        self.dot = Digraph(comment='Decision Tree')

    def build_tree(self, dataset, curr_depth=0):
        X, Y = np.array([row[:-1] for row in dataset]), np.array([row[-1] for row in dataset])
        num_samples, num_features = self.shape(X)

        if num_samples >= self.min_samples_split and curr_depth <= self.max_depth:
            best_split = self.get_best_split(dataset, num_samples, num_features)
            if best_split["info_gain"] > 0:
                left_subtree = self.build_tree(best_split["dataset_left"], curr_depth + 1)
                right_subtree = self.build_tree(best_split["dataset_right"], curr_depth + 1)
                node = Node(best_split["feature_index"], best_split["threshold"], 
                            left_subtree, right_subtree, best_split["info_gain"])
                
                self.print_tree(node)
                print()
                return Node(best_split["feature_index"], best_split["threshold"],
                            left_subtree, right_subtree, best_split["info_gain"])
                
        leaf_value = self.calculate_leaf_value(Y)
        return Node(value=leaf_value)

    def get_best_split(self, dataset, num_samples, num_features):
        best_split = {}
        max_info_gain = -float("inf")

        for feature_index in range(num_features):
            feature_values = np.array([row[feature_index] for row in dataset])
            possible_thresholds = self.unique(feature_values)
            for threshold in possible_thresholds:
                dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
                if len(dataset_left) > 0 and len(dataset_right) > 0:
                    y, left_y, right_y = np.array([row[-1] for row in dataset]), np.array([row[-1] for row in dataset_left]), np.array([row[-1] for row in dataset_right])
                    curr_info_gain = self.information_gain(y, left_y, right_y, "gini")
                    if curr_info_gain > max_info_gain:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        best_split["dataset_left"] = dataset_left
                        best_split["dataset_right"] = dataset_right
                        best_split["info_gain"] = curr_info_gain
                        max_info_gain = curr_info_gain
        return best_split

    def split(self, dataset, feature_index, threshold):
        dataset_left = [row for row in dataset if row[feature_index] <= threshold]
        dataset_right = [row for row in dataset if row[feature_index] > threshold]
        return dataset_left, dataset_right

    def information_gain(self, parent, l_child, r_child, mode="entropy"):
        weight_l = len(l_child) / len(parent)
        weight_r = len(r_child) / len(parent)
        if mode == "gini":
            gain = self.gini_index(parent) - (weight_l * self.gini_index(l_child) + weight_r * self.gini_index(r_child))
        else:
            gain = self.entropy(parent) - (weight_l * self.entropy(l_child) + weight_r * self.entropy(r_child))
        return gain

    def entropy(self, y):
        class_labels = self.unique(y)
        entropy = 0
        for cls in class_labels:
            p_cls = len([label for label in y if label == cls]) / len(y)
            entropy += -p_cls * self.log2(p_cls)
        return entropy

    def gini_index(self, y):
        class_labels = self.unique(y)
        gini = 0
        for cls in class_labels:
            p_cls = len([label for label in y if label == cls]) / len(y)
            gini += p_cls**2
        return 1 - gini

    def calculate_leaf_value(self, Y):
        Y = list(Y)
        return max(Y, key=Y.count)

    def print_tree(self, tree=None, indent=" "):
        if not tree:
            tree = self.root

        if tree.value is not None:
            print(tree.value)

        else:
            print("X_" + str(tree.feature_index), "<=", tree.threshold, "?", tree.info_gain)
            print("%sleft:" % (indent), end="")
            self.print_tree(tree.left, indent + indent)
            print("%sright:" % (indent), end="")
            self.print_tree(tree.right, indent + indent)

    def fit(self, X, Y):
        dataset = X.tolist()
        for i, y in enumerate(Y):
            dataset[i].append(y)
        self.root = self.build_tree(dataset)

    def predict(self, X):
        predictions = [self.make_prediction(x, self.root) for x in X]
        return predictions

    def make_prediction(self, x, tree):
        if tree.value is not None:
            return tree.value
        feature_val = x[tree.feature_index]
        if feature_val <= tree.threshold:
            return self.make_prediction(x, tree.left)
        else:
            return self.make_prediction(x, tree.right)

    def shape(self, array):
        return len(array), len(array[0])

    def unique(self, array):
        return np.unique(array)

    def log2(self, x):
        return 0 if x == 0 else math.log2(x)

    def visualize_tree(self, tree=None):
        global counter
        if not tree:
            tree = self.root

        if tree.value is not None:
            self.dot.node(str(id(tree)), str(tree.value), shape='oval', style='filled', color='lightblue')

        else:
            filename = f'tree_{counter}'
            self.dot.node(str(id(tree)), f'X_{str(tree.feature_index)} <= {str(tree.threshold)}\nInfo Gain: {str(tree.info_gain)}', shape='box', style='filled', color='lightgreen')
            if tree.left:
                self.dot.edge(str(id(tree)), str(id(tree.left)), label='True')
                self.visualize_tree(tree.left)
            if tree.right:
                self.dot.edge(str(id(tree)), str(id(tree.right)), label='False')
                self.visualize_tree(tree.right)

            self.dot.render(filename, view=True)
            counter += 1 


    def save_tree_graph(self, filename=None):
        global counter
        if filename is None:
            filename = f'decision_tree_{counter}'
        self.visualize_tree()
        self.dot.render(filename, view=True)
        counter += 1 

    def visualize_intermediate_trees(self):
        for depth in range(1, self.max_depth + 1):
            classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=depth)
            classifier.fit(X_train, Y_train)
            classifier.visualize_tree()
            # classifier.save_tree_graph()
            print(f"Visualizing Intermediate Tree (Depth {depth})...")


def split(X, y, test_size):
    data = list(zip(X, y))
    num_samples = len(data)
    num_test_samples = int(num_samples * test_size)
    num_train_samples = num_samples - num_test_samples
    random.shuffle(data)
    train_data = data[:num_train_samples]
    test_data = data[num_train_samples:]
    X_train, y_train = zip(*train_data)
    X_test, y_test = zip(*test_data)
    return X_train, y_train, X_test, y_test

def accuracy_score(y_true, y_pred):
    correct = sum(1 for true, pred in zip(y_true, y_pred) if true == pred)
    return correct / len(y_true)


X = data.iloc[:, :-1].values
Y = data.iloc[:, -1].values.reshape(-1, 1)
X_train, Y_train, X_test, Y_test = split(X, Y, test_size=0.3)

X_train

Y_train

num_models = 5
models = []
accuracies = []
precisions = []
recalls = []
f1_scores = []

from sklearn.metrics import precision_score, recall_score, f1_score
for i in range(num_models):
    start_idx = i * 20
    end_idx = (i + 1) * 20
    X_train_subset = np.array(X_train[start_idx:end_idx])
    Y_train_subset = np.array(Y_train[start_idx:end_idx])

    classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=3)
    classifier.fit(X_train_subset, Y_train_subset)

    Y_pred = classifier.predict(X_test)

    accuracy = accuracy_score(Y_test, Y_pred)
    precision = precision_score(Y_test, Y_pred, average='weighted')
    recall = recall_score(Y_test, Y_pred, average='weighted')
    f1 = f1_score(Y_test, Y_pred, average='weighted')

    models.append(classifier)
    accuracies.append(accuracy)
    precisions.append(precision)
    recalls.append(recall)
    f1_scores.append(f1)

    print("Final Tree:")
    # classifier.print_tree()
    print()
    print("Visualizing Final Tree...")
    # classifier.save_tree_graph()
    print()

for i in range(num_models):
    print("Training data: ")
    print(X_train_subset)
    print()
    print(Y_train_subset)
    print(f"Model {i + 1} Metrics:")
    print(f"Accuracy: {accuracies[i]}")
    print(f"Precision: {precisions[i]}")
    print(f"Recall: {recalls[i]}")
    print(f"F1 Score: {f1_scores[i]}")
    print()

def aggregate_predictions(models, X_test, Y_test):
    num_models = len(models)
    aggregated_predictions = []

    for i in range(len(X_test)):
        individual_predictions = [model.predict([X_test[i]])[0] for model in models]
        
        counts = {}
        for prediction in individual_predictions:
            prediction_tuple = tuple(prediction)
            if prediction_tuple in counts:
                counts[prediction_tuple] += 1
            else:
                counts[prediction_tuple] = 1
        aggregated_prediction_tuple = max(counts, key=counts.get)
        aggregated_prediction = np.array(aggregated_prediction_tuple)

        aggregated_predictions.append(aggregated_prediction)

#         print(f"Sample {i + 1} - Individual Predictions: {individual_predictions}, Actual Value: {Y_test[i][0]}, Aggregated Prediction: {aggregated_prediction[0]}")

    accuracy = sum(1 for p, y in zip(aggregated_predictions, Y_test) if np.array_equal(p, y)) / len(Y_test)
    precision = precision_score(Y_test, aggregated_predictions, average='weighted', zero_division=0)
    recall = recall_score(Y_test, aggregated_predictions, average='weighted', zero_division=0)
    f1 = f1_score(Y_test, aggregated_predictions, average='weighted', zero_division=0)

    print("\nAggregated Predictions:", aggregated_predictions)
    print("\nAccuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)

aggregate_predictions(models, X_test, Y_test)


X_train_2, Y_train_2, X_test_2, Y_test_2 = split(X, Y, test_size=0.2)

for i in range(num_models):
    start_idx = i * 20
    end_idx = (i + 1) * 20
    X_train_subset = np.array(X_train_2[start_idx:end_idx])
    Y_train_subset = np.array(Y_train_2[start_idx:end_idx])

    classifier = DecisionTreeClassifier(min_samples_split=3, max_depth=3)
    classifier.fit(X_train_subset, Y_train_subset)

    Y_pred_2 = classifier.predict(X_test_2)

    accuracy_2 = accuracy_score(Y_test_2, Y_pred_2)
    precision_2 = precision_score(Y_test_2, Y_pred_2, average='weighted')
    recall_2= recall_score(Y_test_2, Y_pred_2, average='weighted')
    f1_2 = f1_score(Y_test_2, Y_pred_2, average='weighted')

    models.append(classifier)
    accuracies.append(accuracy_2)
    precisions.append(precision_2)
    recalls.append(recall_2)
    f1_scores.append(f1_2)

    print("Final Tree:")
    classifier.print_tree()
    print()
    print("Visualizing Final Tree...")
    classifier.save_tree_graph()

for i in range(num_models):
    print("Training data: ")
    print(X_train_subset)
    print()
    print(Y_train_subset)
    print(f"Model {i + 1} Metrics:")
    print(f"Accuracy: {accuracies[i]}")
    print(f"Precision: {precisions[i]}")
    print(f"Recall: {recalls[i]}")
    print(f"F1 Score: {f1_scores[i]}")
    print()
    
def aggregate_predictions(models, X_test_2, Y_test_2):
    num_models_2 = len(models)
    aggregated_predictions_2 = []

    for i in range(len(X_test_2)):
        individual_predictions_2 = [model.predict([X_test_2[i]])[0] for model in models]
        
        counts = {}
        for prediction in individual_predictions_2:
            prediction_tuple = tuple(prediction)
            if prediction_tuple in counts:
                counts[prediction_tuple] += 1
            else:
                counts[prediction_tuple] = 1
        aggregated_prediction_tuple = max(counts, key=counts.get)
        aggregated_prediction = np.array(aggregated_prediction_tuple)

        aggregated_predictions_2.append(aggregated_prediction)

    accuracy = sum(1 for p, y in zip(aggregated_predictions_2, Y_test_2) if np.array_equal(p, y)) / len(Y_test_2)
    precision = precision_score(Y_test_2, aggregated_predictions_2, average='weighted', zero_division=0)
    recall = recall_score(Y_test_2, aggregated_predictions_2, average='weighted', zero_division=0)
    f1 = f1_score(Y_test_2, aggregated_predictions_2, average='weighted', zero_division=0)

    print("\nAggregated Predictions:", aggregated_predictions_2)
    print("\nAccuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)

aggregate_predictions(models, X_test_2, Y_test_2)

X_0 <= 4.8 ? 0.4444444444444444
 left:['Iris-setosa']
 right:['Iris-virginica']

X_0 <= 4.9 ? 0.2916666666666667
 left:X_0 <= 4.8 ? 0.4444444444444444
  left:['Iris-setosa']
  right:['Iris-virginica']
 right:['Iris-versicolor']

X_3 <= 1.7 ? 0.34333333333333327
 left:X_0 <= 4.9 ? 0.2916666666666667
  left:X_0 <= 4.8 ? 0.4444444444444444
    left:['Iris-setosa']
    right:['Iris-virginica']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Final Tree:

Visualizing Final Tree...

X_0 <= 4.4 ? 0.19753086419753085
 left:['Iris-setosa']
 right:['Iris-versicolor']

X_0 <= 5.9 ? 0.07438016528925631
 left:['Iris-virginica']
 right:['Iris-virginica']

X_3 <= 1.6 ? 0.3652020202020201
 left:X_0 <= 4.4 ? 0.19753086419753085
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:X_0 <= 5.9 ? 0.07438016528925631
  left:['Iris-virginica']
  right:['Iris-virginica']

Final Tree:

Visualizing Final Tree...

X_1 <= 3.3 ? 0.14201183431952646
 left:['Iris-versicolor']
 right:['Iris-setosa']

X_2 <

  _warn_prf(average, modifier, msg_start, len(result))


X_2 <= 1.6 ? 0.4970414201183432
 left:['Iris-setosa']
 right:['Iris-versicolor']

X_0 <= 6.1 ? 0.34192307692307694
 left:X_2 <= 1.6 ? 0.4970414201183432
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Final Tree:
X_0 <= 6.1 ? 0.34192307692307694
 left:X_2 <= 1.6 ? 0.4970414201183432
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Visualizing Final Tree...
X_2 <= 1.5 ? 0.3550295857988165
 left:['Iris-setosa']
 right:['Iris-versicolor']

X_2 <= 4.9 ? 0.37423076923076926
 left:X_2 <= 1.5 ? 0.3550295857988165
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Final Tree:
X_2 <= 4.9 ? 0.37423076923076926
 left:X_2 <= 1.5 ? 0.3550295857988165
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Visualizing Final Tree...
X_1 <= 2.2 ? 0.17999999999999994
 left:['Iris-virginica']
 right:['Iris-versicolor']

X_2 <= 1.6 ? 0.2527777777777778
 left:['Iris-setosa']
 right:X_1 <= 2.2 ? 0.17

  _warn_prf(average, modifier, msg_start, len(result))


X_0 <= 4.9 ? 0.40816326530612246
 left:['Iris-setosa']
 right:['Iris-versicolor']

X_3 <= 1.7 ? 0.3979591836734694
 left:X_0 <= 4.9 ? 0.40816326530612246
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Final Tree:
X_3 <= 1.7 ? 0.3979591836734694
 left:X_0 <= 4.9 ? 0.40816326530612246
  left:['Iris-setosa']
  right:['Iris-versicolor']
 right:['Iris-virginica']

Visualizing Final Tree...
Training data: 
[[6.2 2.8 4.8 1.8]
 [6.9 3.1 5.4 2.1]
 [6.2 3.4 5.4 2.3]
 [4.6 3.1 1.5 0.2]
 [6.9 3.1 4.9 1.5]
 [6.6 3.  4.4 1.4]
 [5.7 2.8 4.5 1.3]
 [7.2 3.6 6.1 2.5]
 [7.7 3.  6.1 2.3]
 [4.9 3.1 1.5 0.1]
 [5.5 2.6 4.4 1.2]
 [6.  3.  4.8 1.8]
 [6.7 3.  5.  1.7]
 [5.8 2.8 5.1 2.4]]

[['Iris-virginica']
 ['Iris-virginica']
 ['Iris-virginica']
 ['Iris-setosa']
 ['Iris-versicolor']
 ['Iris-versicolor']
 ['Iris-versicolor']
 ['Iris-virginica']
 ['Iris-virginica']
 ['Iris-setosa']
 ['Iris-versicolor']
 ['Iris-virginica']
 ['Iris-versicolor']
 ['Iris-virginica']]
Model 1 Metrics:


In [25]:
# import pandas as pd
# import numpy as np

# # Load the Iris dataset
# col_names = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'type']
# data = pd.read_csv(r"./dataset/Iris.csv", skiprows=1, header=None, names=col_names)

# # Choose the class to under-sample (e.g., 'Iris-setosa')
# class_to_under_sample = 'Iris-setosa'

# # Set the desired imbalance ratio (e.g., 1:3 for the chosen class)
# imbalance_ratio = 3

# # Randomly under-sample the chosen class
# data_imbalanced = data.copy()
# data_imbalanced = data_imbalanced[data_imbalanced['type'] != class_to_under_sample]
# data_imbalanced = pd.concat([
#     data_imbalanced,
#     data[data['type'] == class_to_under_sample].sample(
#         frac=1/imbalance_ratio,
#         random_state=42  # You can change the random state for reproducibility
#     )
# ])

# # Print the class distribution in the imbalanced dataset
# print(data_imbalanced['type'].value_counts())
