Skip to content

Commit

Permalink
More ufunc implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shane-J-Latham committed Aug 10, 2017
1 parent ac080cd commit f15c931
Show file tree
Hide file tree
Showing 17 changed files with 724 additions and 63 deletions.
4 changes: 4 additions & 0 deletions docs/source/reference/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
mpi_array_distribution_test
mpi_array_globale
mpi_array_globale_test
mpi_array_globale_creation
mpi_array_globale_creation_test
mpi_array_globale_ufunc
mpi_array_globale_ufunc_test
mpi_array_indexing
mpi_array_indexing_test
mpi_array_license
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/mpi_array_globale_creation.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: mpi_array.globale_creation
1 change: 1 addition & 0 deletions docs/source/reference/mpi_array_globale_creation_test.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: mpi_array.globale_creation_test
1 change: 1 addition & 0 deletions docs/source/reference/mpi_array_globale_ufunc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: mpi_array.globale_ufunc
1 change: 1 addition & 0 deletions docs/source/reference/mpi_array_globale_ufunc_test.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: mpi_array.globale_ufunc_test
124 changes: 122 additions & 2 deletions mpi_array/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import array_split as _array_split

from . import logging as _logging
from .distribution import BlockPartition
from .distribution import BlockPartition, ClonedDistribution, SingleLocaleDistribution

__author__ = "Shane J. Latham"
__license__ = _license()
Expand Down Expand Up @@ -591,8 +591,14 @@ def inter_locale_comm(self):
#: Hyper-slab partition distribution type
DT_SLAB = "slab"

#: Entire array repeated on each locale.
DT_CLONED = "cloned"

#: Entire array on single locale, no array elements on other locales.
DT_SINGLE_LOCALE = "single_locale"

#: List of value :samp:`distrib_type` values.
_valid_distrib_types = [DT_BLOCK, DT_SLAB]
_valid_distrib_types = [DT_BLOCK, DT_SLAB, DT_CLONED, DT_SINGLE_LOCALE]

#: Node (NUMA) locale type
LT_NODE = "node"
Expand Down Expand Up @@ -627,6 +633,107 @@ def inter_locale_comm(self):
"""


def create_locale_comms(
locale_type=None,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
if locale_type.lower() == LT_PROCESS:
if (intra_locale_comm is not None) and (intra_locale_comm.size > 1):
raise ValueError(
"Got locale_type=%s, but intra_locale_comm.size=%s"
%
(locale_type, intra_locale_comm.size)
)
intra_locale_comm = _mpi.COMM_SELF
locale_comms = \
LocaleComms(
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)
inter_locale_rank_to_peer_rank = locale_comms.inter_locale_rank_to_peer_rank_map
this_locale = locale_comms.this_locale_rank_info

# Broadcast on intra_locale_comm to get peer_rank mapping to all
# peer_comm ranks
inter_locale_rank_to_peer_rank, this_locale = \
locale_comms.intra_locale_comm.bcast(
(inter_locale_rank_to_peer_rank, this_locale),
0
)
locale_comms.rank_logger.debug(
"inter_locale_rank_to_peer_rank=%s",
inter_locale_rank_to_peer_rank
)

return locale_comms, inter_locale_rank_to_peer_rank, this_locale


def create_cloned_distribution(
shape,
locale_type=None,
halo=0,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Factory function for creating :obj:`ClonedDistribution` distribution instance.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` pair.
"""
locale_comms, inter_locale_rank_to_peer_rank, this_locale =\
create_locale_comms(
locale_type=locale_type,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)

cloned_distrib = \
ClonedDistribution(
globale_extent=shape,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank,
num_locales=locale_comms.num_locales,
halo=halo
)
return CommsAndDistribution(locale_comms, cloned_distrib, this_locale)


def create_single_locale_distribution(
shape,
locale_type=None,
halo=0,
peer_comm=None,
intra_locale_comm=None,
inter_locale_comm=None
):
"""
Factory function for creating :obj:`SingleLocaleDistribution` distribution instance.
:rtype: :obj:`CommsAndDistribution`
:return: A :obj:`CommsAndDistribution` pair.
"""
locale_comms, inter_locale_rank_to_peer_rank, this_locale =\
create_locale_comms(
locale_type=locale_type,
peer_comm=peer_comm,
intra_locale_comm=intra_locale_comm,
inter_locale_comm=inter_locale_comm
)

cloned_distrib = \
SingleLocaleDistribution(
globale_extent=shape,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank,
halo=halo
)
return CommsAndDistribution(locale_comms, cloned_distrib, this_locale)


def create_block_distribution(
shape,
locale_type=None,
Expand Down Expand Up @@ -735,6 +842,19 @@ def create_distribution(shape, distrib_type=DT_BLOCK, locale_type=LT_NODE, **kwa
dims = _np.ones_like(shape, dtype="int64")
dims[axis] = 0
comms_and_distrib = create_block_distribution(shape, locale_type, dims=dims, **kwargs)
elif distrib_type.lower() == DT_CLONED:
comms_and_distrib = create_cloned_distribution(shape, locale_type, **kwargs)
elif distrib_type.lower() == DT_SINGLE_LOCALE:
comms_and_distrib = create_single_locale_distribution(shape, locale_type, **kwargs)
else:
raise ValueError(
"Got invalid distrib_type='%s', valid distrib_type values are: %s"
%
(
distrib_type,
", ".join(["'" + str(dt) + "'" for dt in _valid_distrib_types])
)
)

return comms_and_distrib

Expand Down
45 changes: 37 additions & 8 deletions mpi_array/distribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ def __init__(
globale_extent,
locale_extents,
halo=0,
globale_extent_type=LocaleExtent,
locale_extent_type=GlobaleExtent,
globale_extent_type=GlobaleExtent,
locale_extent_type=LocaleExtent,
inter_locale_rank_to_peer_rank=None
):
"""
Expand Down Expand Up @@ -444,7 +444,14 @@ def create_globale_extent(self, globale_extent, halo=0):
and
_np.all([isinstance(e, slice) for e in iter(globale_extent)])
):
globale_extent = GlobaleExtent(slice=globale_extent, halo=halo)
globale_extent = self._globale_extent_type(slice=globale_extent, halo=halo)
elif hasattr(globale_extent, "start") and hasattr(globale_extent, "stop"):
globale_extent = \
self._globale_extent_type(
start=globale_extent.start,
stop=globale_extent.stop,
halo=halo
)
elif (
(hasattr(globale_extent, "__iter__") or hasattr(globale_extent, "__getitem__"))
and
Expand All @@ -456,10 +463,8 @@ def create_globale_extent(self, globale_extent, halo=0):
)
):
stop = _np.array(globale_extent)
globale_extent = GlobaleExtent(start=_np.zeros_like(stop), stop=stop, halo=halo)
elif hasattr(globale_extent, "start") and hasattr(globale_extent, "stop"):
globale_extent = \
GlobaleExtent(start=globale_extent.start, stop=globale_extent.stop, halo=halo)
self._globale_extent_type(start=_np.zeros_like(stop), stop=stop, halo=halo)
else:
raise ValueError(
"Could not construct %s instance from globale_extent=%s."
Expand Down Expand Up @@ -506,6 +511,28 @@ def create_locale_extent(
halo=halo,
**kwargs
)
elif (
(hasattr(locale_extent, "__iter__") or hasattr(locale_extent, "__getitem__"))
and
_np.all(
[
(hasattr(e, "__int__") or hasattr(e, "__long__"))
for e in iter(locale_extent)
]
)
):
stop = _np.array(locale_extent)
locale_extent = \
self._locale_extent_type(
peer_rank=peer_rank,
inter_locale_rank=inter_locale_rank,
globale_extent=globale_extent,
start=_np.zeros_like(stop),
stop=stop,
halo=halo,
**kwargs
)

else:
raise ValueError(
"Could not construct %s instance from locale_extent=%s."
Expand Down Expand Up @@ -564,7 +591,7 @@ def __init__(self, globale_extent, num_locales, halo=0, inter_locale_rank_to_pee
Distribution.__init__(
self,
globale_extent=globale_extent,
locale_extents=[globale_extent.deep_copy() for i in range(num_locales)],
locale_extents=[_copy.deepcopy(globale_extent) for i in range(num_locales)],
halo=halo,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank
)
Expand Down Expand Up @@ -642,6 +669,7 @@ def __init__(
communicator coordinate (:meth:`mpi4py.MPI.CartComm.Get_coords`)
and cartesian communicator peer_rank.
"""
self._globale_extent_type = GlobaleExtent
globale_extent = self.create_globale_extent(globale_extent, halo)
self._num_locales = _np.product(dims)
self._dims = dims
Expand Down Expand Up @@ -678,7 +706,8 @@ def __init__(
locale_extents=locale_extents,
inter_locale_rank_to_peer_rank=inter_locale_rank_to_peer_rank,
halo=halo,
locale_extent_type=CartLocaleExtent
locale_extent_type=CartLocaleExtent,
globale_extent_type=self._globale_extent_type
)

def create_locale_extent(
Expand Down
13 changes: 12 additions & 1 deletion mpi_array/globale.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from .update import MpiPairExtentUpdate as _MpiPairExtentUpdate
from .update import MpiPairExtentUpdateDifferentDtypes as _MpiPairExtentUpdateDifferentDtypes
from .indexing import HaloIndexingExtent as _HaloIndexingExtent
from .globale_ufunc import gndarray_array_ufunc as _gndarray_array_ufunc

__author__ = "Shane J. Latham"
__license__ = _license()
Expand Down Expand Up @@ -459,6 +458,8 @@ def __eq__(self, other):
def __array_ufunc__(self, *args, **kwargs):
"""
"""
from .globale_ufunc import gndarray_array_ufunc as _gndarray_array_ufunc

return _gndarray_array_ufunc(self, *args, **kwargs)

@property
Expand All @@ -485,6 +486,16 @@ def rma_window_buffer(self):
def lndarray_proxy(self):
return self._lndarray_proxy

@property
def ndim(self):
return len(self.shape)

@property
def num_locales(self):
"""
"""
return self._comms_and_distrib.locale_comms.num_locales

@property
def shape(self):
return self._comms_and_distrib.distribution.globale_extent.shape_n
Expand Down
74 changes: 74 additions & 0 deletions mpi_array/globale_creation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
============================================
The :mod:`mpi_array.globale_creation` Module
============================================
Defines :obj:`mpi_array.globale.gndarray` creation functions.
Functions
=========
.. autosummary::
:toctree: generated/
asarray - Returns :obj:`mpi_array.globale.gndarray` equivalent of input.
"""

from __future__ import absolute_import

import numpy as _np

from .license import license as _license, copyright as _copyright, version as _version
from .globale import gndarray as _gndarray, empty as _empty
from . import logging as _logging # noqa: E402,F401
from . import comms as _comms

__author__ = "Shane J. Latham"
__license__ = _license()
__copyright__ = _copyright()
__version__ = _version()


def asarray(a, dtype=None, order=None, **kwargs):
"""
:rtype: :obj:`mpi_array.globale.gndarray`
"""
if hasattr(a, "__class__") and (a.__class__ is _gndarray):
ret_ary = a
elif isinstance(a, _gndarray):
ret_ary =\
_gndarray(
comms_and_distrib=a.comms_and_distrib,
rma_window_buffer=a.rma_window_buffer,
lndarray_proxy=a.lndarray_proxy
)
else:
if "distrib_type" not in kwargs.keys() or kwargs["distrib_type"] is None:
kwargs["distrib_type"] = _comms.DT_CLONED
np_ary = _np.asanyarray(a, dtype, order)
ret_ary = \
_empty(
np_ary.shape,
dtype=np_ary.dtype,
**kwargs
)
if (ret_ary.ndim == 0) and (ret_ary.locale_comms.have_valid_inter_locale_comm):
ret_ary.lndarray_proxy.lndarray[...] = np_ary
else:
locale_rank_view_slice_n = ret_ary.lndarray_proxy.rank_view_slice_n
globale_rank_view_slice_n = \
ret_ary.lndarray_proxy.locale_extent.locale_to_globale_slice_h(
locale_rank_view_slice_n
)

ret_ary.lndarray_proxy.lndarray[locale_rank_view_slice_n] =\
np_ary[globale_rank_view_slice_n]

ret_ary.intra_locale_barrier()

return ret_ary


__all__ = [s for s in dir() if not s.startswith('_')]

0 comments on commit f15c931

Please sign in to comment.