In [1]:
import numpy as np

In [2]:
class MMIT:
    def __init__(self, max_depth=3, margin_length=0.1, p=2, min_split_sample=2):
        # Initialize hyperparameters
        self.max_depth = max_depth  # Maximum depth of the tree
        self.margin_length = margin_length  # Margin length for hinge error
        self.p = p  # Power for hinge error calculation
        self.min_split_sample = min_split_sample  # Minimum samples required to split a node
        self.tree = None  # Placeholder for the tree structure

    def relu(self, x):
        # ReLU function for hinge error calculation
        return np.maximum(0, x)

    def hinge_error(self, y_pred, y_low, y_up):
        # Calculate hinge error based on prediction and interval targets
        return self.relu(y_low - y_pred + self.margin_length) ** self.p + self.relu(y_pred - y_up + self.margin_length) ** self.p

    def best_split(self, X, y):
        # Determine the best feature and threshold for splitting the data
        _, n_features = X.shape
        best_feature, best_threshold = None, None
        min_error = float('inf')

        for feature in range(n_features):
            # Iterate over all unique thresholds for the current feature
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                left_mask = X[:, feature] <= threshold
                right_mask = X[:, feature] > threshold

                # Ensure both splits are valid
                if np.sum(left_mask) >= self.min_split_sample and np.sum(right_mask) >= self.min_split_sample:
                    left_y = y[left_mask]
                    right_y = y[right_mask]

                    # Evaluate the split using hinge error
                    error = self.evaluate_split(left_y) + self.evaluate_split(right_y)

                    # Update the best split if current split has lower error
                    if error < min_error:
                        min_error = error
                        best_feature = feature
                        best_threshold = threshold

        return best_feature, best_threshold

    def evaluate_split(self, y):
        # Evaluate the hinge error for a split
        y_low = y[:, 0] + self.margin_length
        y_up = y[:, 1] - self.margin_length
        valid_values = np.concatenate([y_low[y_low != -np.inf], y_up[y_up != np.inf]])

        if len(valid_values) == 0:
            # No valid values, return zero error
            return 0

        # Compute hinge error for each potential prediction value
        errors = [np.sum(self.hinge_error(val, y_low, y_up)) for val in valid_values]
        return np.min(errors)

    def predict_leaf_value(self, y):
        # Predict the value at a leaf node
        y_low = y[:, 0] + self.margin_length
        y_up = y[:, 1] - self.margin_length
        valid_values = np.concatenate([y_low[y_low != -np.inf], y_up[y_up != np.inf]])

        if len(valid_values) == 0:
            # Default prediction if no valid values
            return 0

        # Find the prediction value that minimizes hinge error
        errors = [np.sum(self.hinge_error(val, y_low, y_up)) for val in valid_values]
        return valid_values[np.argmin(errors)]

    def build_tree(self, X, y, depth):
        # Recursively build the decision tree
        if depth == self.max_depth or len(X) < self.min_split_sample:
            # Stop splitting if max depth is reached or not enough samples
            return {
                'type': 'leaf',
                'value': self.predict_leaf_value(y)
            }

        # Find the best split for the current node
        feature, threshold = self.best_split(X, y)
        if feature is None:
            # If no valid split, create a leaf node
            return {
                'type': 'leaf',
                'value': self.predict_leaf_value(y)
            }

        # Split the data into left and right subsets
        left_mask = X[:, feature] <= threshold
        right_mask = X[:, feature] > threshold

        # Recursively build left and right branches
        return {
            'type': 'node',
            'feature': feature,
            'threshold': threshold,
            'left': self.build_tree(X[left_mask], y[left_mask], depth + 1),
            'right': self.build_tree(X[right_mask], y[right_mask], depth + 1)
        }

    def fit(self, X, y):
        # Fit the decision tree to the data
        self.tree = self.build_tree(X, y, 0)

    def predict_one(self, x, node):
        # Predict the target for a single instance
        if node['type'] == 'leaf':
            return node['value']

        # Traverse the tree based on feature thresholds
        if x[node['feature']] <= node['threshold']:
            return self.predict_one(x, node['left'])
        else:
            return self.predict_one(x, node['right'])

    def predict(self, X):
        # Predict the target for all instances in the dataset
        return np.array([self.predict_one(x, self.tree) for x in X])

In [3]:
class MMIT2:
    def __init__(self, max_depth=3, margin_length=0.1, p=2, min_split_sample=2):
        # Initialize hyperparameters
        self.max_depth = max_depth
        self.margin_length = margin_length
        self.p = p
        self.min_split_sample = min_split_sample
        self.tree = None

    def relu(self, x):
        # Efficient ReLU function
        return np.maximum(0, x)

    def hinge_error(self, y_pred, y_low, y_up):
        # Vectorized hinge error calculation
        return (self.relu(y_low - y_pred + self.margin_length) ** self.p + 
                self.relu(y_pred - y_up + self.margin_length) ** self.p)

    def best_split(self, X, y):
        _, n_features = X.shape
        best_feature, best_threshold = None, None
        min_error = float('inf')

        # Precompute y_low and y_up for all samples
        y_low = y[:, 0] + self.margin_length
        y_up = y[:, 1] - self.margin_length

        for feature in range(n_features):
            # Unique thresholds for the current feature
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                # Compute masks for splitting
                left_mask = X[:, feature] <= threshold
                right_mask = ~left_mask  # Avoid recomputing

                # Ensure valid splits
                if np.sum(left_mask) < self.min_split_sample or np.sum(right_mask) < self.min_split_sample:
                    continue

                # Evaluate split hinge error
                error = (self.evaluate_split(y_low[left_mask], y_up[left_mask]) + self.evaluate_split(y_low[right_mask], y_up[right_mask]))

                # Update the best split if current split is better
                if error < min_error:
                    min_error = error
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold

    def evaluate_split(self, y_low, y_up):
        # Efficiently evaluate the hinge error for a split
        valid_values = np.concatenate([y_low[y_low != -np.inf], y_up[y_up != np.inf]])
        if valid_values.size == 0:
            return 0

        # Vectorized computation of errors for all valid values
        errors = np.sum(self.hinge_error(valid_values[:, None], y_low, y_up), axis=1)
        return np.min(errors)

    def predict_leaf_value(self, y):
        # Predict the optimal value at a leaf node
        y_low = y[:, 0] + self.margin_length
        y_up = y[:, 1] - self.margin_length
        valid_values = np.concatenate([y_low[y_low != -np.inf], y_up[y_up != np.inf]])
        if valid_values.size == 0:
            return 0

        # Vectorized computation of errors for all valid values
        errors = np.sum(self.hinge_error(valid_values[:, None], y_low, y_up), axis=1)
        return valid_values[np.argmin(errors)]

    def build_tree(self, X, y, depth):
        # Build the decision tree recursively
        if depth == self.max_depth or len(X) < self.min_split_sample:
            return {'type': 'leaf', 'value': self.predict_leaf_value(y)}

        # Find the best split
        feature, threshold = self.best_split(X, y)
        if feature is None:
            return {'type': 'leaf', 'value': self.predict_leaf_value(y)}

        # Split the data
        left_mask = X[:, feature] <= threshold
        right_mask = ~left_mask

        # Build subtrees recursively
        return {
            'type': 'node',
            'feature': feature,
            'threshold': threshold,
            'left': self.build_tree(X[left_mask], y[left_mask], depth + 1),
            'right': self.build_tree(X[right_mask], y[right_mask], depth + 1)
        }

    def fit(self, X, y):
        # Fit the decision tree to the data
        self.tree = self.build_tree(X, y, 0)

    def predict_one(self, x, node):
        # Predict the target for a single instance
        while node['type'] != 'leaf':
            if x[node['feature']] <= node['threshold']:
                node = node['left']
            else:
                node = node['right']
        return node['value']

    def predict(self, X):
        # Predict the target for all instances
        return np.array([self.predict_one(x, self.tree) for x in X])

In [4]:
# Example Usage
n_data = 1000
X = np.array([[i, 1] for i in range(1, n_data)])
y = np.array([[i+1, i+2] for i in range(1, n_data)])

In [5]:
dtr = MMIT(max_depth=2, margin_length=0.1, p=2, min_split_sample=2)
dtr.fit(X, y)
y_pred1 = (dtr.predict(X))

In [6]:
dtr = MMIT2(max_depth=2, margin_length=0.1, p=2, min_split_sample=2)
dtr.fit(X, y)
y_pred2 = (dtr.predict(X))

In [7]:
np.sum(y_pred1 - y_pred2)

np.float64(0.0)