Skip to content

Commit

Permalink
Implement parallelization configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
wmayner committed Jan 19, 2023
1 parent abfda19 commit 8a2f220
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 138 deletions.
16 changes: 3 additions & 13 deletions pyphi/actual.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import numpy as np

from . import compute, connectivity, exceptions, utils, validate
from . import compute, conf, connectivity, exceptions, utils, validate
from .compute.parallel import MapReduce
from .conf import config
from .direction import Direction
Expand Down Expand Up @@ -638,10 +638,6 @@ def _get_cuts(transition, direction):
yield ActualCut(direction, partition, transition.node_labels)


DEFAULT_AC_SIA_SEQUENTIAL_THRESHOLD = 4
DEFAULT_AC_SIA_CHUNKSIZE = 2 * DEFAULT_AC_SIA_SEQUENTIAL_THRESHOLD


# TODO(4.0) change parallel default to True?
def sia(transition, direction=Direction.BIDIRECTIONAL, **kwargs):
"""Return the minimal information partition of a transition in a specific
Expand Down Expand Up @@ -681,11 +677,7 @@ def sia(transition, direction=Direction.BIDIRECTIONAL, **kwargs):

cuts = _get_cuts(transition, direction)

kwargs = {
"parallel": config.PARALLEL_CUT_EVALUATION,
"progress": config.PROGRESS_BARS,
**kwargs,
}
parallel_kwargs = conf.parallel_kwargs(config.PARALLEL_CUT_EVALUATION, **kwargs)
result = MapReduce(
_evaluate_cut,
cuts,
Expand All @@ -699,9 +691,7 @@ def sia(transition, direction=Direction.BIDIRECTIONAL, **kwargs):
default=_null_ac_sia(transition, direction, alpha=float("inf"))
),
shortcircuit_func=utils.is_falsy,
chunksize=DEFAULT_AC_SIA_CHUNKSIZE,
sequential_threshold=DEFAULT_AC_SIA_SEQUENTIAL_THRESHOLD,
**kwargs,
**parallel_kwargs,
).run()
log.info("Finished calculating big-ac-phi data for %s.", transition)
log.debug("RESULT: \n%s", result)
Expand Down
11 changes: 5 additions & 6 deletions pyphi/compute/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import logging

from .. import exceptions, utils, validate
from .. import conf, exceptions, utils, validate
from ..conf import config
from ..models import _null_sia
from ..subsystem import Subsystem
Expand Down Expand Up @@ -72,7 +72,6 @@ def possible_complexes(network, state):
return reachable_subsystems(network, network.causally_significant_nodes, state)


# TODO(4.0) parallel: expose args in config
def all_complexes(network, state, **kwargs):
"""Return a generator for all complexes of the network.
Expand All @@ -88,14 +87,14 @@ def all_complexes(network, state, **kwargs):
SystemIrreducibilityAnalysis: A |SIA| for each |Subsystem| of the
|Network|.
"""
kwargs = {"parallel": config.PARALLEL_COMPLEX_EVALUATION, **kwargs}
parallel_kwargs = conf.parallel_kwargs(config.PARALLEL_COMPLEX_EVALUATION, **kwargs)
return MapReduce(
sia,
possible_complexes(network, state),
total=2 ** len(network) - 1,
map_kwargs=dict(progress=False),
desc="Evaluating complexes",
**kwargs,
**parallel_kwargs,
).run()


Expand Down Expand Up @@ -127,7 +126,7 @@ def major_complex(network, state, **kwargs):
log.info("Calculating major complex...")
empty_subsystem = Subsystem(network, state, ())
default = _null_sia(empty_subsystem)
kwargs = {"parallel": config.PARALLEL_COMPLEX_EVALUATION, **kwargs}
parallel_kwargs = conf.parallel_kwargs(config.PARALLEL_COMPLEX_EVALUATION, **kwargs)
result = MapReduce(
sia,
possible_complexes(network, state),
Expand All @@ -136,7 +135,7 @@ def major_complex(network, state, **kwargs):
reduce_kwargs=dict(default=default),
total=2 ** len(network) - 1,
desc="Evaluating complexes",
**kwargs,
**parallel_kwargs,
).run()
log.info("Finished calculating major complex.")
return result
Expand Down
16 changes: 4 additions & 12 deletions pyphi/compute/subsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from more_itertools import collapse

from .. import conf
from .. import connectivity, utils
from ..conf import config
from ..direction import Direction
Expand All @@ -31,10 +32,6 @@
log = logging.getLogger(__name__)


DEFAULT_CES_SEQUENTIAL_THRESHOLD = 4
DEFAULT_CES_CHUNKSIZE = 2 * DEFAULT_CES_SEQUENTIAL_THRESHOLD


def ces(
subsystem,
mechanisms=False,
Expand Down Expand Up @@ -82,16 +79,11 @@ def nonzero_phi(concepts):
def compute_concept(*args, **kwargs):
# Don't serialize the subsystem; this is replaced after returning.
# TODO(4.0) remove when subsystem reference is removed from Concept
concept = subsystem.concept(*args, **kwargs)
concept = subsystem.concept(*args, **kwargs, progress=False)
concept.subsystem = None
return concept

kwargs = {
"chunksize": DEFAULT_CES_CHUNKSIZE,
"sequential_threshold": DEFAULT_CES_SEQUENTIAL_THRESHOLD,
"parallel": config.PARALLEL_CONCEPT_EVALUATION,
**kwargs,
}
parallel_kwargs = conf.parallel_kwargs(config.PARALLEL_CONCEPT_EVALUATION, **kwargs)
concepts = MapReduce(
compute_concept,
mechanisms,
Expand All @@ -103,7 +95,7 @@ def compute_concept(*args, **kwargs):
reduce_func=nonzero_phi,
desc="Computing concepts",
total=total,
**kwargs,
**parallel_kwargs,
).run()
# Replace subsystem references
# TODO(4.0) remove when subsystem reference is removed from Concept
Expand Down
109 changes: 83 additions & 26 deletions pyphi/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import warnings
from copy import copy
from pathlib import Path
from typing import Mapping
from warnings import warn

import ray
Expand Down Expand Up @@ -379,6 +380,7 @@ def on_change_distinction_phi_normalization(obj):
)


# TODO(configuration) actual causation parallel config
class PyphiConfig(Config):
"""``pyphi.config`` is an instance of this class."""

Expand Down Expand Up @@ -452,50 +454,85 @@ def always_zero(a, b):
""",
)

PARALLEL_CONCEPT_EVALUATION = Option(
PARALLEL = Option(
True,
type=bool,
doc="""
Controls whether concepts are evaluated in parallel when computing
cause-effect structures.""",
Global switch to turn off parallelization: if ``False``, parallelization is
never used, regardless of parallelization settings for individual options;
otherwise parallelization is determined by those settings.""",
)

PARALLEL_COMPLEX_EVALUATION = Option(
dict(
parallel=True,
sequential_threshold=2**4,
chunksize=2**6,
progress=True,
),
type=Mapping,
doc="""
Controls parallel evaluation of candidate systems within a network.""",
)

PARALLEL_CUT_EVALUATION = Option(
True,
type=bool,
dict(
parallel=True,
sequential_threshold=2**10,
chunksize=2**12,
progress=True,
),
type=Mapping,
doc="""
Controls whether system cuts are evaluated in parallel, which is faster but
requires more memory. If cuts are evaluated sequentially, only two
|SystemIrreducibilityAnalysis| instances need to be in memory at once.""",
Controls parallel evaluation of system partitions.""",
)

PARALLEL_COMPLEX_EVALUATION = Option(
True,
type=bool,
PARALLEL_CONCEPT_EVALUATION = Option(
dict(
parallel=True,
sequential_threshold=2**6,
chunksize=2**8,
progress=True,
),
type=Mapping,
doc="""
Controls whether systems are evaluated in parallel when computing
complexes.""",
Controls parallel evaluation of candidate mechanisms.""",
)

PARALLEL_PURVIEW_EVALUATION = Option(
False,
dict(
parallel=True,
sequential_threshold=2**6,
chunksize=2**8,
progress=True,
),
type=Mapping,
doc="""
Controls parallel evaluation of candidate purviews. A numeric value may
be used to threshold parallelization on mechanism size (inclusive).""",
Controls parallel evaluation of candidate purviews.""",
)

PARALLEL_MECHANISM_PARTITION_EVALUATION = Option(
True,
type=bool,
dict(
parallel=True,
sequential_threshold=2**10,
chunksize=2**12,
progress=True,
),
type=Mapping,
doc="""
Controls parallel evaluation of mechanism partitions.""",
)

PARALLEL_RELATION_EVALUATION = Option(
False,
type=bool,
dict(
parallel=True,
sequential_threshold=2**10,
chunksize=2**12,
progress=True,
),
type=Mapping,
doc="""
Controls parallel evaluation of mechanism partitions.
Controls parallel evaluation of relations.
Only applies if RELATION_COMPUTATION = 'CONCRETE'.
""",
Expand All @@ -505,9 +542,9 @@ def always_zero(a, b):
-1,
type=int,
doc="""
Controls the number of CPU cores used to evaluate unidirectional cuts.
Negative numbers count backwards from the total number of available cores,
with ``-1`` meaning 'use all available cores.'""",
Controls the number of CPU cores used in parallel evaluation. Negative
numbers count backwards from the total number of available cores, with
``-1`` meaning all available cores.""",
)

MAXIMUM_CACHE_MEMORY_PERCENTAGE = Option(
Expand All @@ -524,8 +561,8 @@ def always_zero(a, b):
dict(),
type=dict,
doc="""
Keyword arguments to ``ray.init()``. Controls the initialization of the ray
cluster.""",
Keyword arguments to ``ray.init()``. Controls the initialization of the Ray
cluster used for parallelization / distributed computation.""",
)

CACHE_REPERTOIRES = Option(
Expand Down Expand Up @@ -970,3 +1007,23 @@ def fallback(*args):
for arg in args:
if arg is not None:
return arg


def parallel_kwargs(option_kwargs, **user_kwargs):
"""Return the kwargs for a parallel function call.
Applies user overrides to the global configuration.
"""
kwargs = copy(option_kwargs)
if not config.PROGRESS_BARS:
kwargs["progress"] = False
if not config.PARALLEL:
kwargs["parallel"] = False
kwargs.update(
{
user_kwarg: value
for user_kwarg, value in user_kwargs.items()
if user_kwarg in kwargs
}
)
return kwargs
3 changes: 1 addition & 2 deletions pyphi/macro.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def __init__(
state,
nodes=None,
cut=None,
mice_cache=None,
time_scale=1,
blackbox=None,
coarse_grain=None,
Expand All @@ -173,7 +172,7 @@ def __init__(
self.blackbox = blackbox
self.coarse_grain = coarse_grain

super().__init__(network, state, micro_node_indices, cut, mice_cache)
super().__init__(network, state, micro_node_indices, cut)

validate.blackbox_and_coarse_grain(blackbox, coarse_grain)

Expand Down
3 changes: 2 additions & 1 deletion pyphi/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def size(self):
@property
def num_states(self):
"""int: The number of possible states of the network."""
return 2 ** self.size
return 2**self.size

@property
def node_indices(self):
Expand Down Expand Up @@ -224,6 +224,7 @@ def reducible(purview):
_from, to = direction.order(mechanism, purview)
return connectivity.block_reducible(cm, _from, to)

# TODO(4.0) use generator?
return [purview for purview in purviews if not reducible(purview)]


Expand Down
16 changes: 3 additions & 13 deletions pyphi/new_big_phi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from textwrap import indent
from typing import Iterable, Optional, Tuple, Union

from .. import compute, connectivity, utils
from .. import compute, conf, connectivity, utils
from ..compute.network import reachable_subsystems
from ..compute.parallel import MapReduce
from ..conf import config, fallback
Expand All @@ -22,10 +22,6 @@
from ..subsystem import Subsystem
from ..warnings import warn_about_tie_serialization

DEFAULT_PARTITION_SEQUENTIAL_THRESHOLD = 2**4
DEFAULT_PARTITION_CHUNKSIZE = 2**2 * DEFAULT_PARTITION_SEQUENTIAL_THRESHOLD


##############################################################################
# Information
##############################################################################
Expand Down Expand Up @@ -321,13 +317,7 @@ def _null_sia(**kwargs):

default_sia = _null_sia(reasons=[ShortCircuitConditions.NO_VALID_PARTITIONS])

kwargs = {
"parallel": config.PARALLEL_CUT_EVALUATION,
"progress": config.PROGRESS_BARS,
"chunksize": DEFAULT_PARTITION_CHUNKSIZE,
"sequential_threshold": DEFAULT_PARTITION_SEQUENTIAL_THRESHOLD,
**kwargs,
}
parallel_kwargs = conf.parallel_kwargs(config.PARALLEL_CUT_EVALUATION, **kwargs)
sias = MapReduce(
evaluate_partition,
partitions,
Expand All @@ -339,7 +329,7 @@ def _null_sia(**kwargs):
),
shortcircuit_func=utils.is_falsy,
desc="Evaluating partitions",
**kwargs,
**parallel_kwargs,
).run()

# Find MIP in one pass, keeping track of ties
Expand Down

0 comments on commit 8a2f220

Please sign in to comment.