Skip to content

CoRelAy is a tool to compose small-scale (single-machine) analysis pipelines.

License

Unknown and 2 other licenses found

Licenses found

Unknown
LICENSE
GPL-3.0
COPYING
LGPL-3.0
COPYING.LESSER
Notifications You must be signed in to change notification settings

virelay/corelay

CoRelAy – Composing Relevance Analysis

CoRelAy Logo

Documentation Status tests PyPI Version License

CoRelAy is a tool to compose small-scale (single-machine) analysis pipelines. Pipelines are designed with a number of steps (Task) with default operations (Processor). Any step of the pipeline may then be indiviually changed by assigning a new operator (Processor). Processors have Params which define their operation.

CoRelAy was created to quickly implement pipelines to generate analysis data which can then be visualized using ViRelAy.

If you find CoRelAy useful for your research, why not cite our related paper:

@article{anders2021software,
      author  = {Anders, Christopher J. and
                 Neumann, David and
                 Samek, Wojciech and
                 Müller, Klaus-Robert and
                 Lapuschkin, Sebastian},
      title   = {Software for Dataset-wide XAI: From Local Explanations to Global Insights with {Zennit}, {CoRelAy}, and {ViRelAy}},
      journal = {CoRR},
      volume  = {abs/2106.13200},
      year    = {2021},
}

Documentation

The latest documentation is hosted at corelay.readthedocs.io.

Install

CoRelAy may be installed using pip with

$ pip install corelay

To install optional HDBSCAN and UMAP support, use

$ pip install corelay[umap,hdbscan]

Usage

Examples to highlight some features of CoRelAy can be found in example/.

We mainly use HDF5 files to store results. The structure used by ViRelAy is documented in the ViRelAy repository at docs/database_specification.md. An example to create HDF5 files which can be used with ViRelAy is shown in example/hdf5_structure.py

To do a full SpRAy analysis which can be visualized with ViRelAy, an advanced script can be found in example/virelay_analysis.py.

The following shows the contents of example/memoize_spectral_pipeline.py:

'''Example using memoization to store (intermediate) results.'''
import time

import h5py
import numpy as np

from corelay.base import Param
from corelay.processor.base import Processor
from corelay.processor.flow import Sequential, Parallel
from corelay.pipeline.spectral import SpectralClustering
from corelay.processor.clustering import KMeans
from corelay.processor.embedding import TSNEEmbedding, EigenDecomposition
from corelay.io.storage import HashedHDF5


# custom processors can be implemented by defining a function attribute
class Flatten(Processor):
    def function(self, data):
        return data.reshape(data.shape[0], np.prod(data.shape[1:]))


class SumChannel(Processor):
    # parameters can be assigned by defining a class-owned Param instance
    axis = Param(int, 1)
    def function(self, data):
        return data.sum(1)


class Normalize(Processor):
    def function(self, data):
        data = data / data.sum((1, 2), keepdims=True)
        return data


def main():
    np.random.seed(0xDEADBEEF)
    fpath = 'test.analysis.h5'
    with h5py.File(fpath, 'a') as fd:
        # HashedHDF5 is an io-object that stores outputs of Processors based on hashes in hdf5
        iobj = HashedHDF5(fd.require_group('proc_data'))

        # generate some exemplary data
        data = np.random.normal(size=(64, 3, 32, 32))
        n_clusters = range(2, 20)

        # SpectralClustering is an Example for a pre-defined Pipeline
        pipeline = SpectralClustering(
            # processors, such as EigenDecomposition, can be assigned to pre-defined tasks
            embedding=EigenDecomposition(n_eigval=8, io=iobj),
            # flow-based Processors, such as Parallel, can combine multiple Processors
            # broadcast=True copies the input as many times as there are Processors
            # broadcast=False instead attempts to match each input to a Processor
            clustering=Parallel([
                Parallel([
                    KMeans(n_clusters=k, io=iobj) for k in n_clusters
                ], broadcast=True),
                # io-objects will be used during computation when supplied to Processors
                # if a corresponding output value (here identified by hashes) already exists,
                # the value is not computed again but instead loaded from the io object
                TSNEEmbedding(io=iobj)
            ], broadcast=True, is_output=True)
        )
        # Processors (and Params) can be updated by simply assigning corresponding attributes
        pipeline.preprocessing = Sequential([
            SumChannel(),
            Normalize(),
            Flatten()
        ])

        start_time = time.perf_counter()

        # Processors flagged with "is_output=True" will be accumulated in the output
        # the output will be a tree of tuples, with the same hierachy as the pipeline
        # (i.e. clusterings here contains a tuple of the k-means outputs)
        clusterings, tsne = pipeline(data)

        # since we memoize our results in a hdf5 file, subsequent calls will not compute
        # the values (for the same inputs), but rather load them from the hdf5 file
        # try running the script multiple times
        duration = time.perf_counter() - start_time
        print(f'Pipeline execution time: {duration:.4f} seconds')


if __name__ == '__main__':
    main()