In [1]:
from aeon.classification.distance_based import ProximityTree
import numpy as np

from numba import njit

In [2]:
from aeon.datasets import load_italy_power_demand
X, y = load_italy_power_demand(return_type='numpy2d')

In [3]:
clf = ProximityTree(n_splitters = 7)

In [4]:
X_train, X_test = X[:900], X[900:]
y_train, y_test = y[:900],y[900:]
X_test.shape

(196, 24)

In [5]:
clf.fit(X_train,y_train)

In [6]:
from sklearn.metrics import accuracy_score

y_pred = clf.predict(X_test)
score = accuracy_score(y_pred, y_test)

In [7]:
score

0.9591836734693877

In [8]:
len(X)

1096

In [6]:
y.ndim

1

In [7]:
X.shape

(1096, 1, 24)

In [8]:
X.reshape((X.shape[0],-1))
X.ndim

3

In [9]:
y = np.array([1,1,2,2,4,4,2,2])
y_subs = [np.array([1,1,4,4]), np.array([2,2,2,2])]
a = np.array([], dtype = y.dtype)
a

array([], dtype=int32)

In [18]:
splitter = clf.get_candidate_splitter(X,y)
len(splitter)

2

In [16]:
X = np.random.rand(10, 50)
X.shape

(10, 50)

In [17]:
y = np.array([1,1,1,2,2,2,0,0,0,1])
y.shape

(10,)

In [20]:
splitter[1].keys()

dict_keys(['lcss'])

In [21]:
from sklearn.utils import check_random_state

In [22]:
random_state = 42
st1 = check_random_state(random_state)

In [23]:
st2 = check_random_state(random_state)

In [24]:
st1

RandomState(MT19937) at 0x1EFF83D0240

In [26]:
st2

RandomState(MT19937) at 0x1EFF83D0840

In [6]:
from numba import njit

@njit(cache=True, fastmath=True)
def gini(y):
        """Get gini score at a specific node.

        Parameters
        ----------
        y : 1d numpy array
            array of class labels

        Returns
        -------
        score : float
            gini score for the set of class labels (i.e. how pure they are). A
            larger score means more impurity. Zero means
            pure.
        """
        # get number instances at node
        n_instances = y.shape[0]
        if n_instances > 0:
            # count each class
            unique_labels = list(np.unique(y))
            class_counts = []
            for i in range(len(unique_labels)):
                cnt = 0
                for j in range(len(y)):
                    if y[j]==unique_labels[i]:
                        cnt += 1
                class_counts.append(cnt)
            class_counts = np.array(class_counts)
            # subtract class entropy from current score for each class
            class_counts = np.divide(class_counts, n_instances)
            class_counts = np.power(class_counts, 2)
            sum = np.sum(class_counts)
            return 1 - sum
        else:
            # y is empty, therefore considered pure
            raise ValueError("y empty")

In [11]:
gini(y)

0.625

In [36]:
unique_class_labels, class_counts = np.unique(y, return_counts=True)
unique_class_labels

array(['1', '2'], dtype='<U1')

In [37]:
class_counts

array([547, 549], dtype=int64)

In [72]:
@njit
def uniques(y):
    unique_labels = list(np.unique(y))
    class_counts = []
    for i in range(len(unique_labels)):
        cnt = 0
        for j in range(len(y)):
            if y[j]==unique_labels[i]:
                cnt += 1
        class_counts.append(cnt)
    unique_labels = np.array(unique_labels)
    class_counts = np.array(class_counts)
    return unique_labels, class_counts

In [73]:
unique_labels, class_counts = uniques(y)

In [74]:
unique_labels

array(['1', '2'], dtype='<U1')

In [62]:
class_counts

array([547, 549], dtype=int64)

In [54]:
y[2]

'2'

In [56]:
for i in range(len(list(np.unique(y)))):
    print(i)

0
1


In [63]:
unique_labels = np.array(np.unique(y))
unique_labels

array(['1', '2'], dtype='<U1')

In [66]:
len(y)

1096

In [9]:
@njit(cache=True, fastmath=True)
def gini_gain(y, y_subs):
        """Get gini score of a split, i.e. the gain from parent to children.

        Parameters
        ----------
        y : 1d array
            array of class labels at parent
        y_subs : list of 1d array like
            list of array of class labels, one array per child

        Returns
        -------
        score : float
            gini score of the split from parent class labels to children. Note a
            higher score means better gain,
            i.e. a better split
        """
        # find number of instances overall
        parent_n_instances = len(y)
        # if parent has no instances then is pure
        if parent_n_instances == 0:
            for child in y_subs:
                if len(child) > 0:
                    raise ValueError("children populated but parent empty")
            return 0.5
        # find gini for parent node
        score = gini(y)
        # sum the children's gini scores
        for index in range(len(y_subs)):
            child_class_labels = y_subs[index]
            # ignore empty children
            if len(child_class_labels) > 0:
                # find gini score for this child
                child_score = gini(child_class_labels)
                # weight score by proportion of instances at child compared to
                # parent
                child_size = len(child_class_labels)
                child_score *= child_size / parent_n_instances
                # add to cumulative sum
                score -= child_score
        return score

In [10]:
y = np.array([1,1,2,2,4,4,2,2])
y_subs = [np.array([1,1,4,4]), np.array([2,2,2,2])]
gini_gain(y,y_subs)

0.375

In [8]:
from aeon.classification.distance_based._proximity_tree import gini_gain
y = np.array([1,1,2,2,4,4,2,2])
y_subs = [np.array([1,1,4,4]), np.array([2,2,2,2])]
gini_gain(y, y_subs)

0.375

In [10]:
len(np.unique(y))

3

In [11]:
np.unique(y)

array([1, 2, 4])

In [15]:
from typing import Type, Union

import numpy as np
from numba import njit
from sklearn.exceptions import NotFittedError
from sklearn.utils import check_random_state

from aeon.classification.base import BaseClassifier
from aeon.distances import distance


class Node:

    def __init__(
        self,
        node_id: int,
        _is_leaf: bool,
        label=None,
        class_distribution=None,
        splitter=None,
    ):
        self.node_id = node_id
        self._is_leaf = _is_leaf
        self.label = label
        self.splitter = splitter
        self.class_distribution = class_distribution or {}
        self.children = {}


class ProximityTree(BaseClassifier):

    _tags = {
        "capability:multivariate": False,
        "capability:unequal_length": False,
        "algorithm_type": "distance",
        "X_inner_type": ["numpy2D"],
    }

    def __init__(
        self,
        n_splitters: int = 5,
        max_depth: int = None,
        min_samples_split: int = 2,
        random_state: Union[int, Type[np.random.RandomState], None] = None,
        n_jobs: int = 1,
        verbose: int = 0,
    ) -> None:
        self.n_splitters = n_splitters
        self.max_depth = max_depth
        self.min_samples_split = min_samples_split
        self.random_state = random_state
        self.n_jobs = n_jobs
        self.verbose = verbose
        super().__init__()

    def get_parameter_value(self, X):
        """Generate random parameter values.

        For a list of distance measures, generate a dictionary
        of parameterized distances.

        Parameters
        ----------
        X : np.ndarray of shape (n_cases, n_timepoints)

        Returns
        -------
        distance_param : a dictionary of distances and their
        parameters.
        """
        rng = check_random_state(self.random_state)

        X_std = X.std()
        param_ranges = {
            "euclidean": {},
            "dtw": {"window": (0, 0.25)},
            "ddtw": {"window": (0, 0.25)},
            "wdtw": {"g": (0, 1)},
            "wddtw": {"g": (0, 1)},
            "erp": {"g": (X_std / 5, X_std)},
            "lcss": {"epsilon": (X_std / 5, X_std), "window": (0, 0.25)},
        }
        random_params = {}
        for measure, ranges in param_ranges.items():
            random_params[measure] = {
                param: np.round(rng.uniform(low, high), 3)
                for param, (low, high) in ranges.items()
            }
        # For TWE
        lmbda = rng.randint(0, 9)
        exponent_range = np.arange(1, 6)  # Exponents from -5 to 1 (inclusive)
        random_exponent = rng.choice(exponent_range)
        nu = 1 / 10**random_exponent
        random_params["twe"] = {"lmbda": lmbda, "nu": nu}

        # For MSM
        base = 10
        # Exponents from -2 to 2 (inclusive)
        exponents = np.arange(-2, 3, dtype=np.float64)
        # Randomly select an index from the exponent range
        random_index = rng.randint(0, len(exponents))
        c = base ** exponents[random_index]
        random_params["msm"] = {"c": c}

        return random_params

    def get_candidate_splitter(self, X, y):
        """Generate candidate splitter.

        Takes a time series dataset and a set of parameterized
        distance measures to create a candidate splitter, which
        contains a parameterized distance measure and a set of exemplars.

        Parameters
        ----------
        X : np.ndarray shape (n_cases, n_timepoints)
            The training input samples.
        y : np.array shape (n_cases,) or (n_cases,1)
        parameterized_distances : dictionary
            Contains the distances and their parameters.

        Returns
        -------
        splitter : list of two dictionaries
            A distance and its parameter values and a set of exemplars.
        """
        rng = check_random_state(self.random_state)

        exemplars = {}
        for label in np.unique(y):
            y_new = y[y == label]
            X_new = X[y == label]
            id = rng.randint(0, X_new.shape[0])
            exemplars[y_new[id]] = X_new[id, :]

        # Create a list with first element exemplars and second element a
        # random parameterized distance measure
        parameterized_distances = self.get_parameter_value(X)
        n = rng.randint(0, 9)
        dist = list(parameterized_distances.keys())[n]
        splitter = [exemplars, {dist: parameterized_distances[dist]}]

        return splitter

    def get_best_splitter(self, X, y):
        """Get the splitter for a node which maximizes the gini gain."""
        max_gain = float("-inf")
        best_splitter = None
        for _ in range(self.n_splitters):
            splitter = self.get_candidate_splitter(X, y)
            labels = list(splitter[0].keys())
            measure = list(splitter[1].keys())[0]
            y_subs = [[] for _ in range(len(labels))]
            for j in range(X.shape[0]):
                min_dist = float("inf")
                sub = None
                for k in range(len(labels)):
                    dist = distance(
                        X[j],
                        splitter[0][labels[k]],
                        metric=measure,
                        kwargs=splitter[1][measure],
                    )
                    if dist < min_dist:
                        min_dist = dist
                        sub = k
                y_subs[sub].append(y[j])
            y_subs = [np.array(ele) for ele in y_subs]
            gini_index = gini_gain(y, y_subs)
            if gini_index > max_gain:
                max_gain = gini_index
                best_splitter = splitter
        return best_splitter

    def _build_tree(self, X, y, depth, node_id, parent_target_value=None):

        # If the data reaching the node is empty
        if len(X) == 0:
            leaf_label = parent_target_value
            leaf_distribution = {}
            leaf = Node(
                node_id=node_id,
                _is_leaf=True,
                label=leaf_label,
                class_distribution=leaf_distribution,
            )
            return leaf

        # Target value in current node
        target_value = self._find_target_value(y)
        class_distribution = {
            label: count / len(y)
            for label, count in zip(*np.unique(y, return_counts=True))
        }

        # If min sample splits is reached
        if self.min_samples_split >= len(X):
            leaf_label = target_value
            leaf = Node(
                node_id=node_id,
                _is_leaf=True,
                label=leaf_label,
                class_distribution=class_distribution,
            )
            return leaf

        # If max depth is reached
        if (self.max_depth is not None) and (depth >= self.max_depth):
            leaf_label = target_value
            leaf = Node(
                node_id=node_id,
                _is_leaf=True,
                label=leaf_label,
                class_distribution=class_distribution,
            )
            return leaf

        # Pure node
        if len(np.unique(y)) == 1:
            leaf_label = target_value
            leaf = Node(
                node_id=node_id,
                _is_leaf=True,
                label=leaf_label,
                class_distribution=class_distribution,
            )
            return leaf

        # Find the best splitter
        splitter = self.get_best_splitter(X, y)

        # Create root node
        node = Node(node_id=node_id, _is_leaf=False, splitter=splitter)

        # For each exemplar split the data
        labels = list(splitter[0].keys())
        measure = list(splitter[1].keys())[0]
        X_child = [[] for _ in labels]
        y_child = [[] for _ in labels]
        for i in range(len(X)):
            min_dist = np.inf
            id = None
            for j in range(len(labels)):
                dist = distance(
                    X[i],
                    splitter[0][labels[j]],
                    metric=measure,
                    kwargs=splitter[1][measure],
                )
                if dist < min_dist:
                    min_dist = dist
                    id = j
            X_child[id].append(X[i])
            y_child[id].append(y[i])
        X_child = [np.array(ele) for ele in X_child]
        y_child = [np.array(ele) for ele in y_child]
        # For each exemplar, create a branch
        for i in range(len(labels)):
            child_node_id = node_id + "." + str(i)
            child_node = self._build_tree(
                X_child[i],
                y_child[i],
                depth=depth + 1,
                node_id=child_node_id,
                parent_target_value=target_value,
            )
            node.children[labels[i]] = child_node

        return node

    @staticmethod
    def _find_target_value(y):
        """Get the class label of highest frequency."""
        unique, counts = np.unique(y, return_counts=True)
        # Find the index of the maximum count
        max_index = np.argmax(counts)
        mode_value = unique[max_index]
        # mode_count = counts[max_index]
        return mode_value

    def _fit(self, X, y):
        # Set the unique class labels
        if (X.ndim != 2) or (y.ndim != 1):
            raise ValueError("X should be of shape (n_cases, n_timepoints).")
        self.classes_ = list(np.unique(y))

        self.root = self._build_tree(
            X, y, depth=0, node_id="0", parent_target_value=None
        )
        self._is_fitted = True

    def _predict(self, X):
        probas = self._predict_proba(X)
        predictions = np.argmax(probas, axis=1)
        return np.array([self.classes_[pred] for pred in predictions])

    def _predict_proba(self, X):
        if not self._is_fitted:
            raise NotFittedError(
                f"This instance of {self.__class__.__name__} has not "
                f"been fitted yet; please call `fit` first."
            )
        # Get the unique class labels
        classes = self.classes_
        class_count = len(classes)
        probas = []

        for i in range(len(X)):
            # Classify the data point and find the leaf node
            leaf_node = self._classify(self.root, X[i])

            # Create probability distribution based on class counts in the leaf node
            proba = np.zeros(class_count)
            for class_label, class_proba in leaf_node.class_distribution.items():
                proba[classes.index(class_label)] = class_proba
            probas.append(proba)

        return np.array(probas)

    def _classify(self, treenode, x):
        # Classify one data point using the proximity tree
        if treenode._is_leaf:
            return treenode
        else:
            measure = list(treenode.splitter[1].keys())[0]
            branches = list(treenode.splitter[0].keys())
            min_dist = np.inf
            id = None
            for i in range(len(branches)):
                dist = distance(
                    x,
                    treenode.splitter[0][branches[i]],
                    metric=measure,
                    kwargs=treenode.splitter[1][measure],
                )
                if dist < min_dist:
                    min_dist = dist
                    id = i
            return self._classify(treenode.children[branches[id]], x)


@njit(cache=True, fastmath=True)
def gini(y) -> float:
    """Get gini score at a specific node.

    Parameters
    ----------
    y : 1d numpy array
        array of class labels

    Returns
    -------
    score : float
        gini score for the set of class labels (i.e. how pure they are). A
        larger score means more impurity. Zero means
        pure.
    """
    # get number instances at node
    n_instances = y.shape[0]
    if n_instances > 0:
        # count each class
        unique_labels = list(np.unique(y))
        class_counts = []
        for i in range(len(unique_labels)):
            cnt = 0
            for j in range(len(y)):
                if y[j] == unique_labels[i]:
                    cnt += 1
            class_counts.append(cnt)
        class_counts = np.array(class_counts)
        # subtract class entropy from current score for each class
        class_counts = np.divide(class_counts, n_instances)
        class_counts = np.power(class_counts, 2)
        sum = np.sum(class_counts)
        return 1 - sum
    else:
        # y is empty, therefore considered pure
        raise ValueError("y empty")


# @njit(cache=True, fastmath=True)
def gini_gain(y, y_subs) -> float:
    """Get gini score of a split, i.e. the gain from parent to children.

    Parameters
    ----------
    y : 1d array
        array of class labels at parent
    y_subs : list of 1d array like
        list of array of class labels, one array per child

    Returns
    -------
    score : float
        gini score of the split from parent class labels to children. Note a
        higher score means better gain,
        i.e. a better split
    """
    # find number of instances overall
    parent_n_instances = y.shape[0]
    # if parent has no instances then is pure
    if parent_n_instances == 0:
        for child in y_subs:
            if len(child) > 0:
                raise ValueError("children populated but parent empty")
        return 0.5
    # find gini for parent node
    score = gini(y)
    # sum the children's gini scores
    for index in range(len(y_subs)):
        child_class_labels = y_subs[index]
        # ignore empty children
        if len(child_class_labels) > 0:
            # find gini score for this child
            child_score = gini(child_class_labels)
            # weight score by proportion of instances at child compared to
            # parent
            child_size = len(child_class_labels)
            child_score *= child_size / parent_n_instances
            # add to cumulative sum
            score -= child_score
    return score


In [17]:
clf1 = ProximityTree(n_splitters = 7, max_depth = 6)

In [18]:
clf1._fit(X_train, y_train)

In [20]:
from sklearn.metrics import accuracy_score

y_pred = clf1._predict(X_test)
score = accuracy_score(y_pred, y_test)

In [21]:
score

0.9438775510204082

In [2]:
from aeon.datasets import load_unit_test
from aeon.classification.distance_based import ProximityTree
X_train, y_train = load_unit_test(split="train")
X_test, y_test = load_unit_test(split="test")
classifier = ProximityTree(n_splitters = 3, max_depth = 5)
classifier.fit(X_train, y_train)

In [8]:
@njit(cache=True, fastmath=True)
def _find_target_value(y):
        """Get the class label of highest frequency."""
        unique_labels = list(np.unique(y))
        class_counts = []
        for i in range(len(unique_labels)):
            cnt = 0
            for j in range(len(y)):
                if y[j] == unique_labels[i]:
                    cnt += 1
            class_counts.append(cnt)
        class_counts = np.array(class_counts)
        # Find the index of the maximum count
        max_index = np.argmax(class_counts)
        mode_value = unique_labels[max_index]
        # mode_count = counts[max_index]
        return mode_value

In [10]:
_find_target_value(y)

'2'

In [5]:
np.unique(y, return_counts=True)

(array(['1', '2'], dtype='<U1'), array([547, 549], dtype=int64))

In [6]:
   def _find_target_value(y):
        """Get the class label of highest frequency."""
        unique, counts = np.unique(y, return_counts=True)
        # Find the index of the maximum count
        max_index = np.argmax(counts)
        mode_value = unique[max_index]
        # mode_count = counts[max_index]
        return mode_value

In [7]:
_find_target_value(y)

'2'