# DecisionTree
Below we give the code of the classification tree and the regression tree.

In [1]:
import numpy as np
from collections import Counter

### Loss Functions

In [2]:
# loss_function_1: classification, entropy
def Entropy(y):
    label_dic = Counter(y) # {label: counts}
    n = len(y)
    entropy = sum(-x/n*np.log2(x/n) for x in label_dic.values())
    return entropy

# loss_fucntion_2: classification, gini index
def Gini(y):
    label_dic = Counter(y) # {label: counts}
    n = len(y)
    gini = 1 - sum(np.square(x/n) for x in label_dic.values())
    return gini

# loss_function_3: regression, variance(l2_norm estimated by the mean value)

# loss_function_4: regression, variance by median
def Var_median(y):
    median = np.median(y)
    loss = np.mean((y - median)**2)
    return loss

### Value Estimator

In [3]:
# Classification Esitimator: most common vote
def most_common_vote(y):
    label_dict = Counter(y)
    most_common_label = label_dict.most_common(1)[0][0]
    return most_common_label

# Regression Estimator: mean or median


In [4]:
class DecisionTree():
    # initialize a tree
    def __init__(self, loss_function, leaf_value_estimator, max_depth=5,current_depth=0,min_sample=5,loss_threshold=1e-5):
        self.loss_function = loss_function # Classification: Gini or Entropy; Regression: MSE
        self.leaf_value_estimator = leaf_value_estimator
        self.max_depth = max_depth
        self.current_depth = current_depth
        self.min_sample = min_sample
        self.loss_threshold = loss_threshold
        # tree structure
        self.split_id = None
        self.split_value = None
        self.isleaf = None
        self.left = None
        self.right = None 
        self.value = None 

    # Choose the feature: run if the number of remaining features > 0 and the classification has not meet the standards
    def fit(self, X, y):
        num_sample, num_feature = X.shape
        isunique = (len(np.unique(y)) == 1)
        # If the number of remaining features = 0 or the classification has meet the standards, return as a leaf node
        # Only the leaf node has the value
        if self.current_depth >= self.max_depth or num_sample <= self.min_sample or isunique or self.loss_function(y)<self.loss_threshold:
            self.isleaf = True
            self.value = self.leaf_value_estimator(y)
            return self
        # Else, split and recurse to left and right sub-trees
        best_loss = self.loss_function(y)
        best_split_id = None
        best_split_position = None
        best_split_value = None
        best_X_left = None
        best_X_right = None
        best_y_left = None
        best_y_right = None
        num_feature = X.shape[1]
        y_copied = y.reshape(-1,1)
        Xy = np.concatenate([X, y_copied], 1)
        for feature_id in range(num_feature):
            # sort by given feature
            Xy_sorted = np.array(sorted(Xy, key=lambda x: x[feature_id])) 
            # choose the best split value of this feature
            for split_position in range(len(Xy_sorted)-1):
                X_left = Xy_sorted[:split_position+1,:-1]
                X_right = Xy_sorted[split_position+1:,:-1]
                y_left = Xy_sorted[:split_position+1,-1]
                y_right = Xy_sorted[split_position+1:,-1]
                # calculate loss
                loss_left = len(y_left)/len(y) * self.loss_function(y_left)
                loss_right = len(y_right)/len(y) * self.loss_function(y_right)
                # update the split position
                if (loss_left + loss_right < best_loss):
                    best_split_id = feature_id
                    best_split_position = split_position
                    best_split_value = Xy_sorted[best_split_position, best_split_id]
                    best_loss = loss_left + loss_right
                    best_X_left = X_left
                    best_X_right = X_right
                    best_y_left = y_left
                    best_y_right = y_right
        # Recurese and construct the decision tree
        if best_split_id != None:
            self.left = DecisionTree(self.loss_function, self.leaf_value_estimator, self.max_depth, current_depth=self.current_depth + 1, min_sample=self.min_sample, loss_threshold=self.loss_threshold)
            self.left.fit(best_X_left, best_y_left)
            self.right = DecisionTree(self.loss_function, self.leaf_value_estimator, self.max_depth, current_depth=self.current_depth + 1, min_sample=self.min_sample, loss_threshold=self.loss_threshold)
            self.right.fit(best_X_right, best_y_right)
            
            self.split_id = best_split_id
            self.split_value = best_split_value
        else: 
            self.isleaf = True
            self.value = self.leaf_value_estimator(y)
        return self

    # Predict the label/value given a new instance
    def predict(self, X_new):
        # Only leaf node has the value
        if self.isleaf:
            return self.value
        else:
            if X_new[self.split_id] <= self.split_value:
                return self.left.predict(X_new)
            else:
                return self.right.predict(X_new)
            


In [5]:
from pathlib import Path
import pandas as pd

# Resolve project paths and organize outputs
PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / "datasets").exists() and (PROJECT_ROOT.parent / "datasets").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
if not (PROJECT_ROOT / "datasets").exists():
    raise FileNotFoundError("Could not locate datasets folder. Run from project root or src directory.")

DATA_DIR = PROJECT_ROOT / "datasets" / "boston-housing"
OUTPUT_DIR = PROJECT_ROOT / "outputs"
PLOTS_DIR = OUTPUT_DIR / "plots"
MODELS_DIR = OUTPUT_DIR / "models"
PREDICTIONS_DIR = OUTPUT_DIR / "predictions"
for path in [OUTPUT_DIR, PLOTS_DIR, MODELS_DIR, PREDICTIONS_DIR]:
    path.mkdir(parents=True, exist_ok=True)

print(f"Project root: {PROJECT_ROOT}")
print(f"Outputs directory: {OUTPUT_DIR}")
data_path = DATA_DIR / "HousingData.csv"
df = pd.read_csv(data_path, sep=",")


Project root: /Users/xuguangjie/Desktop/PhD Courses/博二上/数据挖掘/大作业/data-mining
Outputs directory: /Users/xuguangjie/Desktop/PhD Courses/博二上/数据挖掘/大作业/data-mining/outputs


In [6]:
X = np.array(df.iloc[:,:-1])
y = np.array(df.iloc[:,-1])

In [10]:
regtree = DecisionTree(np.var,np.mean, loss_threshold=0.01)

In [11]:
regtree.fit(X,y)

<__main__.DecisionTree at 0x10fdfae00>

In [12]:
regtree.predict(X[200])

np.float64(34.15555555555555)

In [13]:
y[200]

np.float64(32.9)

In [1]:
from pathlib import Path
import pandas as pd

# Resolve project paths and organize outputs
PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / "datasets").exists() and (PROJECT_ROOT.parent / "datasets").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
if not (PROJECT_ROOT / "datasets").exists():
    raise FileNotFoundError("Could not locate datasets folder. Run from project root or src directory.")

DATA_DIR = PROJECT_ROOT / "datasets" / "bank-marketing"
OUTPUT_DIR = PROJECT_ROOT / "outputs"
PLOTS_DIR = OUTPUT_DIR / "plots"
MODELS_DIR = OUTPUT_DIR / "models"
PREDICTIONS_DIR = OUTPUT_DIR / "predictions"
for path in [OUTPUT_DIR, PLOTS_DIR, MODELS_DIR, PREDICTIONS_DIR]:
    path.mkdir(parents=True, exist_ok=True)

print(f"Project root: {PROJECT_ROOT}")
print(f"Outputs directory: {OUTPUT_DIR}")
data_path = DATA_DIR / "bank-full.csv"
df = pd.read_csv(data_path, sep=";")


Project root: /Users/xuguangjie/Desktop/PhD Courses/博二上/数据挖掘/大作业/data-mining
Outputs directory: /Users/xuguangjie/Desktop/PhD Courses/博二上/数据挖掘/大作业/data-mining/outputs


In [3]:
import numpy as np
X = np.array(df.iloc[:,:-1])
y = np.array(df.iloc[:,-1])

In [7]:
X_train = X[:400]
y_train = y[:400]
clstree = DecisionTree(loss_function=Entropy, leaf_value_estimator=most_common_vote)

In [8]:
clstree.fit(X_train, y_train)

<__main__.DecisionTree at 0x111e1d0f0>

In [9]:
clstree.predict(X[401])

'no'

In [4]:
y[401]

'no'

In [None]:
class RegressionTree():
    loss_function_dict = {
        "MSE_mean": np.var,
        "MSE_median":Var_median
    }
    estimator_dict = {
        "mean":np.mean,
        "median": np.median
    }

    # initialize a tree
    def __init__(self, loss_function="MSE_mean", leaf_value_estimator="mean", max_depth=5,current_depth=0,min_sample=5,loss_threshold=1e-5):
        try:
            self.loss_function = self.loss_function_dict[loss_function]
            self.leaf_value_estimator = self.estimator_dict[leaf_value_estimator]
        except KeyError:
            raise ValueError(
            f"Unknown loss_function '{loss_function}'. "
            f"Available: {list(self.loss_function_dict.keys())}"
        )
        try:
            self.leaf_value_estimator = self.estimator_dict[leaf_value_estimator]
        except KeyError:
            raise ValueError(
            f"Unknown leaf_value_estimator '{leaf_value_estimator}'. "
            f"Available: {list(self.estimator_dict.keys())}"
        )
        
        self.max_depth = max_depth
        self.current_depth = current_depth
        self.min_sample = min_sample
        self.loss_threshold = loss_threshold
        # tree structure
        self.split_id = None
        self.split_value = None
        self.isleaf = None
        self.left = None
        self.right = None 
        self.value = None 

    # Check the node infomation
    def node_info(self):
        return {
            "depth": self.current_depth,
            "is_leaf": self.isleaf,
            "split_id": self.split_id,
            "split_value": self.split_value,
            "value": self.value,
        }

    # Choose the feature: run if the number of remaining features > 0 and the classification has not meet the standards
    def fit(self, X, y):
        num_sample, num_feature = X.shape
        isunique = (len(np.unique(y)) == 1)
        # If the number of remaining features = 0 or the classification has meet the standards, return as a leaf node
        # Only the leaf node has the value
        if self.current_depth >= self.max_depth or num_sample <= self.min_sample or isunique or self.loss_function(y)<self.loss_threshold:
            self.isleaf = True
            self.value = self.leaf_value_estimator(y)
            return self
        # Else, split and recurse to left and right sub-trees
        best_loss = self.loss_function(y)
        best_split_id = None
        best_split_position = None
        best_split_value = None
        best_X_left = None
        best_X_right = None
        best_y_left = None
        best_y_right = None
        num_feature = X.shape[1]
        # y is (n,)
        y_copied = y.reshape(-1,1)
        Xy = np.concatenate([X, y_copied], 1)
        # RegressionTree can use the same feature in different nodes
        for feature_id in range(num_feature):
            # sort by given feature
            Xy_sorted = np.array(sorted(Xy, key=lambda x: x[feature_id])) 
            # choose the best split value of this feature
            for split_position in range(len(Xy_sorted)-1):
                X_left = Xy_sorted[:split_position+1,:-1]
                X_right = Xy_sorted[split_position+1:,:-1]
                y_left = Xy_sorted[:split_position+1,-1]
                y_right = Xy_sorted[split_position+1:,-1]
                # calculate loss
                loss_left = len(y_left)/len(y) * self.loss_function(y_left)
                loss_right = len(y_right)/len(y) * self.loss_function(y_right)
                # update the split position
                if (loss_left + loss_right < best_loss):
                    best_split_id = feature_id
                    best_split_position = split_position
                    best_split_value = Xy_sorted[best_split_position, best_split_id]
                    best_loss = loss_left + loss_right
                    best_X_left = X_left
                    best_X_right = X_right
                    best_y_left = y_left
                    best_y_right = y_right
        # Recurese and construct the decision tree
        if best_split_id != None:
            self.left = DecisionTree(self.loss_function, self.leaf_value_estimator, self.max_depth, current_depth=self.current_depth + 1, min_sample=self.min_sample, loss_threshold=self.loss_threshold)
            self.left.fit(best_X_left, best_y_left)
            self.right = DecisionTree(self.loss_function, self.leaf_value_estimator, self.max_depth, current_depth=self.current_depth + 1, min_sample=self.min_sample, loss_threshold=self.loss_threshold)
            self.right.fit(best_X_right, best_y_right)
            
            self.split_id = best_split_id
            self.split_value = best_split_value
        else: 
            self.isleaf = True
            self.value = self.leaf_value_estimator(y)
        return self

    # Predict the value given a new instance
    def predict(self, X_new):
        # Only leaf node has the value
        if self.isleaf:
            return self.value
        else:
            if X_new[self.split_id] <= self.split_value:
                return self.left.predict(X_new)
            else:
                return self.right.predict(X_new)
    
    # Predict the value given a batch of new instances
    def predict_batch(self,X_new):
        X = np.array(X_new)
        preds = [self.predict(x) for x in X]
        return np.array(preds)            
