In [1]:
import numpy as np

### Data
h = np.array([1, 2, 3, 4, 3, 4, 5, 4, 3, 2, 3, 4, 5, 6, 5, 6, 7, 6, 5, 4, 3, 2, 1, 2, 1])

W = np.zeros((25, 25), dtype='int8')
W[np.eye(len(W), k=1, dtype='bool')] = 1
W[np.eye(len(W), k=-1, dtype='bool')] = 1

In [2]:
import pandas as pd
pd.DataFrame(W)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,15,16,17,18,19,20,21,22,23,24
0,0,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,1,0,1,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,1,0,1,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,0,0,1,0,1,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0,0,0,1,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
5,0,0,0,0,1,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
6,0,0,0,0,0,1,0,1,0,0,...,0,0,0,0,0,0,0,0,0,0
7,0,0,0,0,0,0,1,0,1,0,...,0,0,0,0,0,0,0,0,0,0
8,0,0,0,0,0,0,0,1,0,1,...,0,0,0,0,0,0,0,0,0,0
9,0,0,0,0,0,0,0,0,1,0,...,0,0,0,0,0,0,0,0,0,0


In [4]:
## Typing
from typing import Tuple, Any, List, Iterable
IndexType = int
import numpy as np

def _lists_intersection(list1, list2) -> list:
    return list(set(list1).intersection(list2))

def _reduce_lists(lists: Iterable[list]) -> list:
    return sum(lists, [])


def _mock_pytest_data():
    n = 5
    v = np.array([5, 3, 2, 5, 4])
    w = np.zeros((n, n))
    w[np.eye(len(w), k=1, dtype='bool')] = 1
    w[np.eye(len(w), k=-1, dtype='bool')] = 1
    oracle = NdarrayEchelonOracle(v, w)
    return v, w


class EchelonOracleBase:
    def copy_indices(self) -> List[IndexType]:
        pass
    def nb(self, indices: List[IndexType]) -> List[IndexType]:
        pass
    def max_indices(self, indices: List[IndexType]) -> Tuple[List[IndexType], Any]:
        pass
    def find_peaktop(self, indices: List[IndexType]) -> Tuple[IndexType, Any]:
        """
        Examples:
            >>>NdarrayEchelonOracle(*_mock_pytest_data()).find_peaktop([1, 2, 3])
            (3, 5)
        """
        _argmax_list, _max = self.max_indices(indices)
        return _argmax_list[0], _max

    def max_nb(self, indices: List[IndexType]) -> Tuple[List[IndexType], Any]:
        """
        Examples:
            >>>NdarrayEchelonOracle(*_mock_pytest_data()).max_nb([1, 2])
            ([0, 3], 5)
        """
        nb_indices = self.nb(indices)
        return self.max_indices(nb_indices)

    def max_nb_indices(self, ind1, ind2):
        inds = _lists_intersection(self.nb(ind1), ind2)
        return self.max_indices(inds)

    def pop_extend_family(self, indices, families):
            _family_i = indices
            for i, family in enumerate(families):
                if _lists_intersection(self.nb(indices), family):
                    _family_i += family
                    del families[i]
            return _family_i, families

class NdarrayEchelonOracle(EchelonOracleBase):
    def __init__(self, values: np.ndarray, adjacency: np.ndarray):
        """Constructor.

        Params:
            data: 1-dimensional array of the observed values for the grids.
            adjacency: 2-dimensional adjacency matrix for the grids.
        """
        assert len(values) == len(adjacency)
        self.values = values
        self.adjacency = adjacency

    def copy_indices(self) -> List[IndexType]:
        return list(range(len(self.values)))

    def nb(self, indices: List[IndexType]) -> List[IndexType]:
        """
        Examples:
            >>>NdarrayEchelonOracle(*_mock_pytest_data()).nb([1, 2])
            [0, 3]
        """
        _adj = adjacency[indices]
        _adj = np.logical_or.reduce(_adj)
        _neighbors = list(np.nonzero(_adj)[0])
        return list(set(_neighbors)-set(indices))

    def max_indices(self, indices: List[IndexType]) -> Tuple[List[IndexType], Any]:
        """
        Examples:
            >>>NdarrayEchelonOracle(*_mock_pytest_data()).max_indices([1, 2])
            ([1], 3)
        """
        vals = self.values[indices]
        _max = np.max(vals)
        _argmax_list = [indices[i] for i in np.flatnonzero(vals == _max)]
        return _argmax_list, _max

In [35]:
def _mock_1dim_data():
    h = np.array([1, 2, 3, 4, 3, 4, 5, 4, 3, 2, 3, 4, 5, 6, 5, 6, 7, 6, 5, 4, 3, 2, 1, 2, 1])
    W = np.zeros((25, 25), dtype='int8')
    W[np.eye(len(W), k=1, dtype='bool')] = 1
    W[np.eye(len(W), k=-1, dtype='bool')] = 1
    return h, W


def find_peak_echelons(oracle: EchelonOracleBase) -> List[List[IndexType]]:
    """
    Returns:
        peak_echelons

    Examples:
        >>>h, W = _mock_1dim_data()
        >>>oracle = NdarrayEchelonOracle(h, W)
        >>>find_peak_echelons(oracle)
        [[16, 17, 15], [13], [6, 5, 7], [3], [23]]

    Notes:
        After finding the initial peaktop in each iteration,
        the algorithm takes the "set" of argmax indices,
        i.e., it considers the contour set in the neighborhood
        simultaneously as candidates.
    """
    indices = oracle.copy_indices()
    peak_echelons = []
    while len(indices):
        _current_echelon = []
        _removed = []

        ## Start from finding a single peak point among the remaining candidates.
        _argmax_single, _max = oracle.find_peaktop(indices)
        _argmax = [_argmax_single]

        ## Extend from the peak point to its neighbors until the search hits a local minimum.
        ## Handle ties simultaneously (i.e., argmax is a set, not a point).
        while _max >= oracle.max_nb(_current_echelon + _argmax)[1]:
            ## If not local minimum, add to peak echelon
            ## and remove searched candidates.
            _current_echelon += _argmax
            _removed += _argmax
            ## And continue to next round
            _argmax, _max = oracle.max_nb(_current_echelon)
        _removed += _argmax # Remove searched candidates
        if len(_current_echelon):
            peak_echelons.append(_current_echelon)
        indices = list(set(indices) - set(_current_echelon) - set(_removed))
    return peak_echelons


from copy import deepcopy
def find_foundation_echelons(oracle: EchelonOracleBase, peak_echelons: List[List[IndexType]]) -> List[List[IndexType]]:
    unsearched_inds = list(set(oracle.copy_indices())-set(_reduce_lists(peak_echelons)))
    families = deepcopy(peak_echelons)
    foundation_echelons = []

    while len(unsearched_inds):
        _current_echelon = []
        ## Select a single point among the unsearched maxima.
        _argmax_single, _initial_max = oracle.find_peaktop(unsearched_inds)
        _argmax = [_argmax_single]
        _current_echelon += _argmax
        _current_family = _argmax
        unsearched_inds = list(set(unsearched_inds) - set(_argmax))

        while True:
            ## Blocking statement
            if (not oracle.nb(_current_family)) or (not unsearched_inds):
                break
            ## Extend to family
            _current_family, families = oracle.pop_extend_family(_current_family, families)
            ## Check neighbor argmax set
            _argmax, _max = oracle.max_nb(_current_family)
            # If the max value is the same as the original max, unconditionally accept them.
            if (_max == _initial_max):
                _current_family += _argmax
                _current_echelon += _argmax # Foundation echelon contains only those which do not consist the peak echelons.
                unsearched_inds = list(set(unsearched_inds) - set(_argmax))
            # If none of the neighboring candidates is a local minimum, accept the candidates.
            elif (_max > oracle.max_nb(_current_family + _argmax)[1]):
                _current_family += _argmax
                _current_echelon += _argmax # Foundation echelon contains only those which do not consist the peak echelons.
                unsearched_inds = list(set(unsearched_inds) - set(_argmax))
            else:
                break

        families.append(_current_family)
        foundation_echelons.append(_current_echelon)
        if not unsearched_inds: # If all marked as searched
            break
    return foundation_echelons

data, adjacency = _mock_1dim_data()
oracle = NdarrayEchelonOracle(data, adjacency)
peak_echelons = find_peak_echelons(oracle)
print(peak_echelons)
find_foundation_echelons(oracle, peak_echelons)

[[16, 17, 15], [13], [6, 5, 7], [3], [23]]


[[12, 14, 18, 19, 11, 10, 20], [2, 4, 8], [1, 9, 21], [0, 22, 24]]

In [37]:
from typing import Tuple, List
IndexSetType = List[int]
EchelonType = IndexSetType
EchelonsType = List[EchelonType]
NeighborsType = List[IndexSetType]

from dataclasses import dataclass

@dataclass
class Result_EchelonAnalysis:
    """Class for keeping the results of echelon analysis."""
    peak_echelons: EchelonsType
    foundation_echelons: EchelonsType


class OneDimEchelonAnalysis:
    def __call__(self, data: np.ndarray) -> Result_EchelonAnalysis:
        """
        Params:
            data: 1-dimensional data of realized values.
        
        Examples:
            >>>h, _ = _mock_1dim_data()
            >>>OneDimEchelonAnalysis()(h)
            Result_EchelonAnalysis(peak_echelons=[[16, 17, 15], [13], [6, 5, 7], [3], [23]], foundation_echelons=[[12, 14, 18, 19, 11, 10, 20], [2, 4, 8], [1, 9, 21], [0, 22, 24]])
        """
        W = np.zeros((len(data),)*2, dtype='int8')
        W[np.eye(len(W), k=1, dtype='bool')] = 1
        W[np.eye(len(W), k=-1, dtype='bool')] = 1
        oracle = NdarrayEchelonOracle(data, W)
        peak_echelons = find_peak_echelons(oracle)
        foundation_echelons = find_foundation_echelons(oracle, peak_echelons)
        return Result_EchelonAnalysis(
            peak_echelons=peak_echelons,
            foundation_echelons=foundation_echelons
        )


@dataclass
class Result_EchelonAnalysisR:
    """Class for keeping the results of echelon analysis."""
    coord: object
    region_names: object
    echelons: EchelonsType
    table: object


class EchelonAnalysisR:
    def __call__(self, x: np.ndarray, nb: np.ndarray, name: List[str]) -> Result_EchelonAnalysisR:
        """Perform the analysis.

        Params:
            x: 1-dim array of the observed value for each index.
            nb: 2-dim array representing the adjacency matrix.
            name: list of the names of the indices.

        Examples:
            >>>EchelonAnalysisR()(*_mock_1dim_data())
            Result_EchelonAnalysisR()
        """
        oracle = NdarrayEchelonOracle(x, nb)
        find_peak_echelons(oracle)
        find_foundation_echelons(oracle, peak_echelons)

        return EchelonAnalysisResult(
            coord,
            region_names,
            echelons,
            table,
        )
