# Decision Tree

In [307]:
import pandas as pd
from abc import ABC, abstractmethod
from collections import Counter
import numpy as np

In [308]:
class Classifier(ABC):
    class Tree():
        def __init__(self, feature_index: int = None, threshold: float = None, left: 'Classifier.Tree' = None, right: 'Classifier.Tree' = None, lvalue: int = None, rvalue: int = None):
            """
            A node in the decision tree.
            :param feature_index: Index of the feature to split on.
            :param threshold: Threshold value for the split.
            :param left: Left child node.
            :param right: Right child node.
            :param value: Class label if it's a leaf node.
            """
            self.feature_index = feature_index
            self.threshold = threshold
            self.left = left
            self.right = right
            self.lvalue = lvalue
            self.rvalue = rvalue
            
    def gini_impurity(self, x):
        """
        Calculate the Gini impurity for a given set of labels.

        :param x: A list of labels. (Series, list, DataFrame-column, etc.)
        :return: The Gini impurity value.
        """
        if x.empty:
            return 0.0

        total_count = len(x)
        label_counts = Counter(x)

        impurity = 1.0
        for count in label_counts.values():
            probability = count / total_count
            impurity -= probability ** 2

        return impurity, label_counts
    
    def weighted_average(self, gi1, gi2, w1, w2):
        """Calculate Weighted Gini Impurity Average of two Gini impurity values"""
        return {0:(gi1 * w1)/(w1+w2), 1:(gi2 * w2) / (w1 + w2)}

In [309]:
class DecisionTreeClassifier(Classifier):
    def __init__(self, max_depth:int =None):
        self.max_depth = max_depth
        self.root = None
        
    def __check_GI(self, X, y):
        """
        This method should implement the logic to find the best feature and threshold to split on based on Gini impurity.
        """
        col_lbl_cts =[np.array(sorted(Counter(X[i]).keys())) for i in X.columns]
        thresholds = [(a[:-1]+a[1:])/2 if a.size>1 else [] for a in col_lbl_cts]

        GIs = {}
        for i in range(len(thresholds)):
            for j in thresholds[i]:
                X_l = X[X.iloc[:, i] <= j]
                X_r = X[X.iloc[:, i] > j]
                y_l = y[X.iloc[:, i] <= j]
                y_r = y[X.iloc[:, i] > j]
                
                gi_l, lc_l = self.gini_impurity(y_l)
                gi_r, lc_r = self.gini_impurity(y_r)
                
                w1, w2 = len(X_l), len(X_r)
                gi_avg = self.weighted_average(gi_l, gi_r, w1, w2)
                GIs[gi_avg[0] + gi_avg[1]] = (i, j, gi_l, gi_r, lc_l, lc_r)
        
        return GIs[min(GIs.keys())] if GIs else (None, None, None, None, None, None)
    
    def __build_tree(self, X, y, root, depth=1):
        """
        Recursively build the decision tree.
        :param root: The current node in the tree.
        :param depth: Current depth of the tree.
        """
        fi, thr, gi_l, gi_r, lc_l, lc_l = None, None, None, None, None, None
        if self.max_depth >= depth:
            fi, thr, gi_l, gi_r, lc_l, lc_r= self.__check_GI(X, y)
            root.feature_index = fi
            root.threshold = thr

        if self.max_depth == depth:
            root.lvalue = lc_l.most_common(1)[0][0]
            root.rvalue = lc_r.most_common(1)[0][0]
        else:
            if gi_l != 0:
                root.left = self.Tree()
                self.__build_tree(X[X.iloc[:, fi] <= thr], y[X.iloc[:, fi] <= thr], root.left, depth + 1)
            else:
                root.lvalue = lc_l.most_common(1)[0][0]
            if gi_r != 0:
                root.right = self.Tree()
                self.__build_tree(X[X.iloc[:, fi] > thr], y[X.iloc[:, fi] > thr], root.right, depth + 1)
            else:
                root.rvalue = lc_r.most_common(1)[0][0]
            
    def fit(self, X, y):
        if not isinstance(X, pd.DataFrame):
            raise TypeError("X must be a pandas DataFrame")
        if not isinstance(y, pd.Series):
            raise TypeError("y must be a pandas Series")
        if len(X) != len(y):
            raise ValueError("X and y must have the same number of samples")
        self.root = self.Tree()
        self.__build_tree(X, y, root=self.root)
    
    def predict(self, X):
        y_pred = []
        if not isinstance(X, pd.DataFrame):
            raise TypeError("X must be a pandas DataFrame")
        for _, row in X.iterrows():
            node = self.root
            while node.left or node.right:
                if row.iloc[node.feature_index] <= node.threshold:
                    if node.left:
                        node = node.left
                    else:
                        y_pred.append(node.lvalue)
                        break
                else:
                    if node.right:
                        node = node.right
                    else:
                        y_pred.append(node.rvalue)
                        break
            else:
                y_pred.append(node.lvalue if row.iloc[node.feature_index] <= node.threshold else node.rvalue)
                
        return pd.Series(y_pred, index=X.index)

In [310]:
df = pd.read_csv("data/sample1.csv")
for col in df.columns:
    if df[col].dtype == "object":
        df[col] = df[col].map(lambda x: 1 if x == "Yes" else 0)
train_X = df[["Loves Popcorn", "Loves Soda", "Age"]]
train_Y = df["Loves Cool As Ice"]
df

Unnamed: 0,Loves Popcorn,Loves Soda,Age,Loves Cool As Ice
0,0,1,35,1
1,1,1,7,0
2,1,1,38,1
3,1,0,12,0
4,1,0,50,0
5,0,1,18,1
6,0,0,83,0


In [311]:
df.sort_values(by="Age")

Unnamed: 0,Loves Popcorn,Loves Soda,Age,Loves Cool As Ice
1,1,1,7,0
3,1,0,12,0
5,0,1,18,1
0,0,1,35,1
2,1,1,38,1
4,1,0,50,0
6,0,0,83,0


In [312]:
model = DecisionTreeClassifier(max_depth=3)
model.fit(train_X, train_Y)
model.predict(train_X)

0    1
1    0
2    1
3    0
4    0
5    1
6    0
dtype: int64