## **Model**

#### 一些情况说明：
- adaboost 的参数命名是基于我们的readme file
- 有关决策树 - 根据hw05改的，有以下几个改变：
    - adaboost 的结果一般是-1， 1； decision tree是0，1.所以设置了converted 这个变量， 如果 = true， 那么-1， 1 与0， 1 相互转换
    - 针对决策树增加了weight
    - 针对决策树增加了threshold. 本意是这样数据集X不光可以是0/1， 也可以是别的数字。但是！现在！过不了！（TODO！！）
    - 在adaboost调用decision tree的时候，predict时，针对X每行开头增加了一个0，变为X_with_zero。因为decision tree的predict传入参数是[y+x]的形式
- 一些代码调试相关
    - decisiontree.ipynb 是decision tree单独拎出来的。目前可以过hw05的所有test。（估计过不了input有0/1以外的test)
    - adaboost.ipynb 注释掉了一段调用decision tree标准库的代码，可以用那段代码作为参照 - 当调用那段代码的时候，X的input可以是任意形式。但是需要注意，如果使用这段代码， adaboost的predict函数不要用X_with_zero， 要用X



In [114]:
import numpy as np

def node_score_error(prob):
    '''
        Calculate the node score using the train error of the subdataset and return it.
        For a dataset with two classes, C(p) = min{p, 1-p}
    '''
    return min(prob, 1.0 - prob)

def node_score_entropy(prob):
    '''
        Calculate the node score using the entropy of the subdataset and return it.
        For a dataset with 2 classes, C(p) = -p * log(p) - (1-p) * log(1-p)
        For the purposes of this calculation, assume 0*log0 = 0.
        HINT: remember to consider the range of values that p can take!
    '''
    # HINT: If p < 0 or p > 1 then entropy = 0

    if prob <= 0.0 or prob >= 1.0:
        return 0.0
    
    return -prob * np.log(prob) - (1.0 - prob) * np.log(1.0 - prob)


def node_score_gini(prob):
    '''
        Calculate the node score using the gini index of the subdataset and return it.
        For dataset with 2 classes, C(p) = 2 * p * (1-p)
    '''

    return 2.0 * prob * (1.0 - prob)

class Node:
    '''
    Helper to construct the tree structure.
    '''
    def __init__(self, left=None, right=None, depth=0, index_split_on=0, isleaf=False, label=1):
        self.left = left
        self.right = right
        self.depth = depth
        self.index_split_on = index_split_on
        self.isleaf = isleaf
        self.label = label
        self.info = {}  # used for visualization
        self.threshold = None

    def _set_info(self, gain, num_samples):
        '''
        Helper function to add to info attribute.
        '''
        self.info['gain'] = gain
        self.info['num_samples'] = num_samples


class DecisionTree:

    def __init__(self, data, gain_function=node_score_entropy, max_depth=40, weight=None, converted=None):
        # Initialize the decision tree with data and parameters.
        if converted is not None:
            for row in data:
                if row[0] == -1:
                    row[0] = 0  # Convert -1 to 0
                    
        self.majority_class = 1 if sum(row[0] for row in data) > len(data) / 2 else 0
        self.max_depth = max_depth
        self.root = Node(label=self.majority_class)
        self.gain_function = gain_function
        if weight is None:
            self.sample_weight = np.ones(len(data)) / len(data)
        else:
            self.sample_weight = weight / np.sum(weight)

        indices = list(range(1, len(data[0])))
        self._split_recurs(self.root, data, indices, self.sample_weight)


    def predict(self, features, converted=None):
        '''
        Predict the label for given features.
        '''
        if features.ndim == 1:  # 1d array
            prediction = self._predict_recurs(self.root, features)
            return -1 if converted and prediction == 0 else prediction
        else:  # 2d array
            predictions = []
            for feature in features:
                prediction = self._predict_recurs(self.root, feature)
                if converted and prediction == 0:
                    prediction = -1
                predictions.append(prediction)
            return np.array(predictions) 

    def accuracy(self, data):
        '''
        Calculate accuracy on the given data.
        '''
        return 1 - self.loss(data)

    def loss(self, data):
        '''
        Calculate loss on the given data.
        '''
        test_Y = np.array([row[0] for row in data])  # Get the true labels
        predictions = self.predict(np.array(data))  # Get the predicted results
        return np.mean(predictions != test_Y) 

    def _predict_recurs(self, node, row):
        '''
        Predict label by traversing the tree.
        '''
        if node.isleaf or node.index_split_on == 0:
            return node.label
        split_index = node.index_split_on
        if not row[split_index]:
            return self._predict_recurs(node.left, row)
        else:
            return self._predict_recurs(node.right, row)


    def _is_terminal(self, node, data, indices):
        '''
        Check if the node should stop splitting.
        '''
        y = [row[0] for row in data]

        sumy = sum(row[0] for row in data)

        if len(data) - sumy == sumy:
            majority_label = self.majority_class
        else:
            majority_label = 1 if sumy > len(data) / 2 else 0

        if len(set(y)) == 1:
            return True, y[0]
        if len(data) == 0:
            return True, self.majority_class
        if len(indices) == 0:
            return True, majority_label

        if node.depth >= self.max_depth:
            return True, majority_label

        return False, majority_label

    def _split_recurs(self, node, data, indices, weights):
        '''
        Recursively split the node based on data.
        '''
        node.isleaf, node.label = self._is_terminal(node, data, indices)

        if not node.isleaf:
            max_gain = -1
            best_threshold = None

            for split_index in indices:
                feature_values = sorted(set(row[split_index] for row in data))

                for i in range(len(feature_values) - 1):
                    threshold = (feature_values[i] + feature_values[i + 1]) / 2
                    gain = self._calc_gain(data, split_index, self.gain_function, threshold, weights)

                    if gain > max_gain:
                        max_gain = gain
                        node.index_split_on = split_index
                        best_threshold = threshold

                if len(feature_values) == 1:
                    gain = self._calc_gain(data, split_index, self.gain_function, feature_values[0], weights)
                    if gain > max_gain:
                        max_gain = gain
                        node.index_split_on = split_index
                        best_threshold = feature_values[0]

            node._set_info(max_gain, len(data))
            node.threshold = best_threshold

            node.left = Node(depth=node.depth + 1)
            node.right = Node(depth=node.depth + 1)
            indices.remove(node.index_split_on)

            leftData = [row for row in data if row[node.index_split_on] <= node.threshold]
            rightData = [row for row in data if row[node.index_split_on] > node.threshold]

            left_weights = weights[[row[node.index_split_on] <= node.threshold for row in data]]
            right_weights = weights[[row[node.index_split_on] > node.threshold for row in data]]

            self._split_recurs(node.left, leftData, indices, left_weights)
            self._split_recurs(node.right, rightData, indices, right_weights)
        else:
            node._set_info(0, len(data))

    def _calc_gain(self, data, split_index, gain_function, threshold=None, weights=None):
        '''
        Calculate gain for the proposed split.
        '''
        if threshold is None:
            threshold = 0.5
        if weights is None:
            weights = np.ones(len(data))  # Default weights
        y = [row[0] for row in data]
        xi = [1 if row[split_index] > threshold else 0 for row in data]
        
        if len(y) != 0 and len(xi) != 0:
            total_weight = np.sum(weights)
            probY = np.sum(weights * y) / total_weight
            probX = np.sum(weights * xi) / total_weight

            y1x1 = sum(weights[index] for index in range(len(y)) if y[index] == 1 and xi[index] == 1)
            y0x0 = sum(weights[index] for index in range(len(y)) if y[index] == 0 and xi[index] == 0)

            prob1 = y1x1 / total_weight 
            prob2 = y0x0 / total_weight 

            probxi_true = (probX * gain_function(prob1 / probX)) if probX > 0 else 0
            probxi_false = ((1.0 - probX) * gain_function(prob2 / (1.0 - probX))) if probX < 1.0 else 0

            gain = gain_function(probY) - probxi_true - probxi_false
        else:
            gain = 0

        return gain

In [115]:
# HW_adaboost.py
from sklearn.tree import DecisionTreeClassifier  # Importing DecisionTreeClassifier
    
class AdaBoostClassifier:
    """
    AdaBoost (Adaptive Boosting) Classifier
    An ensemble learning algorithm that combines multiple weak classifiers to build a strong classifier.
    """

    def __init__(self, n_estimators=10, max_depth=1):
        """
        Initialize the AdaBoost classifier.

        Parameters:
        - n_estimators: Number of weak classifiers to use.
        """
        self.n_estimators = n_estimators
        self.max_depth = max_depth  # Store max_depth for DecisionTree
        self.w = []  # Store the weights of the classifiers
        self.models = []  # Store the weak classifiers

    def train(self, X, y):
        """
        Fit the AdaBoost model to the training data.

        Parameters:
        - X: Training data, shape (n_samples, n_features)
        - y: Target labels, shape (n_samples,)
        """
        n_samples, n_features = X.shape
        # Initialize weights uniformly
        D = np.ones(n_samples) / n_samples  

        for t in range(self.n_estimators):
            '''
            # sklearn
            # Create a weak classifier (decision stump)
            std_model = DecisionTreeClassifier(max_depth=2)  
            # Fit the model to the training data
            std_model.fit(X, y, sample_weight=D)  # Add this line to train the model
            y_pred_sklearn = std_model.predict(X)
            '''
            # self-implemented
            weak_model = DecisionTree(data=np.column_stack((y, X)), max_depth=self.max_depth, weight=D, converted=True) 
            y_pred = weak_model.predict(features=np.column_stack((y, X)), converted = True)
            # Calculate the weighted error
            error = np.sum(D * (y_pred != y))  # Weighted error

            # Calculate the weight for the weak classifier
            w_t = 0.5 * np.log((1.0 - error) / (error + 1e-10))  # Avoid division by zero

            # Update weights for the next iteration
            D *= np.exp(-w_t * y * y_pred)  # Update weights based on prediction
            D /= np.sum(D * np.exp(-w_t * y * y_pred))  # Normalize weights


            self.models.append(weak_model)  # Store the model
            self.w.append(w_t)  # Store the w_t

    def predict(self, X, converted = True):
        """
        Predict the class labels for the input data.

        Parameters:
        - X: Input data, shape (n_samples, n_features)

        Returns:
        - Predicted class labels, shape (n_samples,)
        """
        pred = np.zeros(X.shape[0])  # Initialize predictions
        X_with_zero = np.insert(X, 0, 0, axis=1)  # Insert 0 at the beginning of each row
        for w_i, model in zip(self.w, self.models):
            pred += w_i * model.predict(X_with_zero, converted)  # Weighted sum of predictions
        return np.sign(pred)  # Return the sign of the predictions
    
    def accuracy(self, X, y):
        """
        Calculate the accuracy of the model.

        Parameters:
        - X: Input data, shape (n_samples, n_features)
        - y: True labels, shape (n_samples,)

        Returns:
        - Accuracy as a float.
        """
        predictions = self.predict(X)  # Get predictions
        accuracy = np.mean(predictions == y)  # Calculate accuracy
        return accuracy


In [116]:
if __name__ == "__main__":
     # Create a simple dataset
     X = np.array([
        [0, 0, 1, 0],
        [1, 1, 0, 1],
        [1, 0, 1, 0],
        [0, 1, 0, 1],
        [0, 0, 0, 0],
        [1, 1, 1, 1],
        [0, 1, 1, 0],
        [1, 0, 0, 1],
        [1, 1, 0, 0],
        [0, 0, 1, 1]
    ])  # 10 samples with 4 features

     y = np.array([-1, 1, 1, -1, -1, 1, 1, -1, 1, -1])  # Binary labels (-1 and 1)
     # Initialize the AdaBoost classifier
     model = AdaBoostClassifier(n_estimators=10, max_depth=1)

     # Train the model
     model.train(X, y)

     # Calculate accuracy
     accuracy = model.accuracy(X, y)

     # Print results
     print("Accuracy:", accuracy)


Accuracy: 1.0


## **Check Model**

In [117]:
import pytest
import numpy as np

# Sets random seed for testing purposes
np.random.seed(0)

# Creates Test Models
test_model1 = AdaBoostClassifier(n_estimators=10)
test_model2 = AdaBoostClassifier(n_estimators=50)
test_model3 = AdaBoostClassifier(n_estimators=20)

# Dataset 1
x1 = np.array([
    [0, 0, 1, 0],
    [1, 1, 0, 1],
    [1, 0, 1, 0],
    [0, 1, 0, 1],
    [0, 0, 0, 0],
    [1, 1, 1, 1],
    [0, 1, 1, 0],
    [1, 0, 0, 1],
    [1, 1, 0, 0],
    [0, 0, 1, 1]
])  # 10 samples with 4 features

y1 = np.array([-1, 1, 1, -1, -1, 1, 1, -1, 1, -1])  # Binary labels (-1 and 1)

# Dataset 2
x2 = np.array([
    [0, 1, 0, 1, 1, 0],
    [1, 0, 1, 0, 0, 1],
    [1, 1, 0, 1, 0, 0],
    [0, 0, 1, 1, 1, 1],
    [1, 0, 0, 0, 1, 0],
    [0, 1, 1, 0, 0, 1],
    [1, 1, 1, 0, 1, 1],
    [0, 0, 0, 1, 0, 0],
    [1, 0, 1, 1, 1, 0],
    [0, 1, 0, 0, 1, 1]
])  # 10 samples with 6 features

y2 = np.array([-1, 1, 1, -1, 1, -1, 1, -1, 1, -1])  # Binary labels (-1 and 1)

# Dataset 3
x3 = np.array([
    [1, 1, 0, 0, 1, 1],
    [0, 0, 1, 1, 0, 0],
    [1, 0, 1, 0, 1, 0],
    [0, 1, 0, 1, 1, 1],
    [1, 1, 1, 0, 0, 1],
    [0, 0, 0, 1, 0, 1],
    [1, 0, 0, 1, 1, 0],
    [0, 1, 1, 0, 1, 1],
    [1, 1, 1, 1, 0, 0],
    [0, 0, 1, 0, 1, 0]
])  # 10 samples with 6 features

y3 = np.array([1, -1, 1, -1, 1, -1, 1, -1, 1, -1])  # Binary labels (-1 and 1)

# Test Model Train
def check_train_dtype(model, X, y):
    assert isinstance(model.models, list)
    assert len(model.models) > 0, "Model should have trained at least one weak learner."
    assert len(model.w) == len(model.models), "Weights should match the number of models."

# Train the models
test_model1.train(x1, y1)
check_train_dtype(test_model1, x1, y1)

test_model2.train(x2, y2)
check_train_dtype(test_model2, x2, y2)

test_model3.train(x3, y3)
check_train_dtype(test_model3, x3, y3)

# Test Model Predictions
def check_test_dtype(pred, X_test):
    assert isinstance(pred, np.ndarray)
    assert pred.ndim == 1 and pred.shape == (X_test.shape[0],)

# Make predictions
pred1 = test_model1.predict(x1)
check_test_dtype(pred1, x1)
assert (pred1 == y1).all(), "Predictions should match the expected labels for model 1."

pred2 = test_model2.predict(x2)
check_test_dtype(pred2, x2)
assert (pred2 == y2).all(), "Predictions should match the expected labels for model 2."

pred3 = test_model3.predict(x3)
check_test_dtype(pred3, x3)
assert (pred3 == y3).all(), "Predictions should match the expected labels for model 3."

# Test Model Accuracy
def check_accuracy(model, X, y, expected_accuracy):
    accuracy = model.accuracy(X, y)
    assert accuracy == expected_accuracy, f"Expected accuracy: {expected_accuracy}, but got: {accuracy}"

# Check accuracy
check_accuracy(test_model1, x1, y1, 1.0)  # Expecting 100% accuracy for this simple case
check_accuracy(test_model2, x2, y2, 1.0)  # Expecting 100% accuracy for this dataset
check_accuracy(test_model3, x3, y3, 1.0)  # Expecting 100% accuracy for this dataset

# Additional Tests for Edge Cases
def test_empty_train():
    with pytest.raises(ValueError):
        test_model1.train(np.array([]), np.array([]))

def test_empty_predict():
    with pytest.raises(ValueError):
        test_model1.predict(np.array([]))

def test_accuracy_empty():
    with pytest.raises(ValueError):
        test_model1.accuracy(np.array([]), np.array([]))

# Run additional edge case tests
test_empty_train()
test_empty_predict()
test_accuracy_empty()

# Print a message indicating the tests have completed
print("All tests completed successfully.")

All tests completed successfully.
