In [None]:
# The Platonic Ideal: Verify empirically that
# - train, test, oob mutually disjunct
# - train U test U oob = entire sample
# - all oob observations get a leaf assignment
# - all observations within leaf cell bounds
# - any way to verify optimal splits subject to constraints?
#
# The Capitulation to Reality:
# quite a bit of shenanigans to work around the fact that the base
# DecisionTreeClassifier does not retain training indices in the nodes,
# and therefore node membership by index cannot be verified post hoc
#
# instead we settle for the following procedure
# - eliminate randomness
# - train on untampered data to identify purported honest, structure, and oob
#   sample indices
# - alter honest X values such that if they affect splits in any way,
#   the changes should result in different splits
# - verify that the splits remain the same
# - this tests a stronger assumption than the honesty assumption
#   (that honest Y values are not considered) because stratified sampling necessarily
#   considers Y distribution when selecting splits (for honest/structure partitioning),
#   so that we can't get stable partitions across trials if we alter Y values
# - next we alter structure X values similarly
# - verify that the splits change as expected


In [4]:
import numpy as np

def make_trunk_classification(
    n_samples,
    n_dim,
    n_informative=1,
    simulation: str = "trunk",
    mu_0: float = 0,
    mu_1: float = 1,
    rho: int = 0,
    band_type: str = "ma",
    return_params: bool = False,
    mix: float = 0.5,
    seed=None,
):
    if n_dim < n_informative:
        raise ValueError(
            f"Number of informative dimensions {n_informative} must be less than number "
            f"of dimensions, {n_dim}"
        )
    rng = np.random.default_rng(seed=seed)
    rng1 = np.random.default_rng(seed=seed)
    mu_0 = np.array([mu_0 / np.sqrt(i) for i in range(1, n_informative + 1)])
    mu_1 = np.array([mu_1 / np.sqrt(i) for i in range(1, n_informative + 1)])
    if rho != 0:
        if band_type == "ma":
            cov = _moving_avg_cov(n_informative, rho)
        elif band_type == "ar":
            cov = _autoregressive_cov(n_informative, rho)
        else:
            raise ValueError(f'Band type {band_type} must be one of "ma", or "ar".')
    else:
        cov = np.identity(n_informative)
    if mix < 0 or mix > 1:
        raise ValueError("Mix must be between 0 and 1.")
    # speed up computations for large multivariate normal matrix with SVD approximation
    if n_informative > 1000:
        method = "cholesky"
    else:
        method = "svd"
    if simulation == "trunk":
        X = np.vstack(
            (
                rng.multivariate_normal(mu_0, cov, n_samples // 2, method=method),
                rng1.multivariate_normal(mu_1, cov, n_samples // 2, method=method),
            )
        )
    elif simulation == "trunk_overlap":
        mixture_idx = rng.choice(
            2, n_samples // 2, replace=True, shuffle=True, p=[mix, 1 - mix]
        )
        norm_params = [[mu_0, cov], [mu_1, cov]]
        X_mixture = np.fromiter(
            (
                rng.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X_mixture_2 = np.fromiter(
            (
                rng1.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X = np.vstack(
            (
                X_mixture.reshape(n_samples // 2, n_informative),
                X_mixture_2.reshape(n_samples // 2, n_informative),
            )
        )
    elif simulation == "trunk_mix":
        mixture_idx = rng.choice(
            2, n_samples // 2, replace=True, shuffle=True, p=[mix, 1 - mix]
        )
        norm_params = [[mu_0, cov], [mu_1, cov]]
        X_mixture = np.fromiter(
            (
                rng1.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X = np.vstack(
            (
                rng.multivariate_normal(
                    np.zeros(n_informative), cov, n_samples // 2, method=method
                ),
                X_mixture.reshape(n_samples // 2, n_informative),
            )
        )
    else:
        raise ValueError(f"Simulation must be: trunk, trunk_overlap, trunk_mix")
    if n_dim > n_informative:
        X = np.hstack(
            (X, rng.normal(loc=0, scale=1, size=(X.shape[0], n_dim - n_informative)))
        )
    y = np.concatenate((np.zeros(n_samples // 2), np.ones(n_samples // 2)))
    if return_params:
        returns = [X, y]
        if simulation == "trunk":
            returns += [[mu_0, mu_1], [cov, cov]]
        elif simulation == "trunk-overlap":
            returns += [[np.zeros(n_informative), np.zeros(n_informative)], [cov, cov]]
        elif simulation == "trunk-mix":
            returns += [*list(zip(*norm_params)), X_mixture]
        return returns
    return X, y

In [8]:
import math
from sklearn.ensemble import HonestRandomForestClassifier as HonestForestClassifier
#from treeple.ensemble import HonestForestClassifier


# in order for this test to work, one must ensure that the honest split rejection
# criteria never veto a desired split by the shadow structure tree.
# the lazy way to do this is to make sure there are enough honest observations
# so that there will be enough on either side of any potential structure split.
# thus more dims => more samples
N_TREES = 1
N_DIM = 10
SAMPLE_SIZE = 2098
RANDOM_STATE = 1
HONEST_FRACTION = 0.95
STRATIFY = True

X, y = make_trunk_classification(
    n_samples=SAMPLE_SIZE,
    n_dim=N_DIM,
    n_informative=1,
    seed=0,
    mu_0=-5,
    mu_1=5
)
X_t = np.concatenate((
    X[: SAMPLE_SIZE // 2],
    X[SAMPLE_SIZE // 2 :]
))
y_t = np.concatenate((
    y[: SAMPLE_SIZE // 2],
    y[SAMPLE_SIZE // 2 :]
))


def perturb(X, y, indices):
    for d in range(N_DIM):
        for i in indices:
            if y[i] == 0 and np.random.randint(0, 2, 1) > 0:
                X[i, d] -= 5
            elif np.random.randint(0, 2, 1) > 0:
                X[i, d] -= 2

    return X, y


class Trial:
    def __init__(self, X, y):
        self.est = HonestForestClassifier(
            n_estimators=N_TREES,
            max_samples=1.0,
            max_features=0.3,
            bootstrap=True,
            stratify=STRATIFY,
            n_jobs=-2,
            random_state=RANDOM_STATE,
            honest_prior="ignore",
            honest_fraction=HONEST_FRACTION,
        )
        self.est.fit(X, y)
        
        self.tree = self.est.estimators_[0]
        self.honest_tree = self.tree.tree_
        self.structure_tree = self.honest_tree.target_tree
        self.honest_indices = np.sort(self.tree.honest_indices_)
        self.structure_indices = np.sort(self.tree.structure_indices_)
        self.threshold = self.honest_tree.target_tree.threshold.copy()


trial_results = []
trial_results.append(Trial(X_t, y_t))

# perturb honest X values; threshold should not change
X_t, y_t = perturb(X_t, y_t, trial_results[0].honest_indices)

trial_results.append(Trial(X_t, y_t))
assert np.array_equal(
    trial_results[0].honest_indices,
    trial_results[1].honest_indices
)
assert np.array_equal(
    trial_results[0].structure_indices,
    trial_results[1].structure_indices
)
assert np.array_equal(
    trial_results[0].threshold,
    trial_results[1].threshold
), f"threshold1 = {trial_results[0].threshold}\nthreshold2 = {trial_results[1].threshold}"


# perturb structure X's; threshold should change
X_t, y_t = perturb(X_t, y_t, trial_results[0].structure_indices)
trial_results.append(Trial(X_t, y_t))
assert np.array_equal(
    trial_results[0].honest_indices,
    trial_results[2].honest_indices
)
assert np.array_equal(
    trial_results[0].structure_indices,
    trial_results[2].structure_indices
)
assert not np.array_equal(
    trial_results[0].threshold,
    trial_results[2].threshold
)

print("done")


done


In [7]:
from sklearn.model_selection import StratifiedShuffleSplit

# verify elimination of randomness from StratifiedShuffleSplit
ss = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=1)
for structure_idx, _ in ss.split(
    np.zeros((20, 1)), [1 if i > 10 else 0 for i in range(20)]
):
    structure_idx1 = structure_idx.copy()

ss = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=1)
for structure_idx, _ in ss.split(
    np.zeros((20, 1)), [1 if i > 10 else 0 for i in range(20)]
):
    structure_idx2 = structure_idx.copy()

assert np.array_equal(structure_idx1, structure_idx2)

print("done")


done
