Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow parallel I/O for hydro and particle reading #4730

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
33 changes: 27 additions & 6 deletions yt/frontends/ramses/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from yt.utilities.cython_fortran_utils import FortranFile as fpu
from yt.utilities.lib.cosmology_time import friedman
from yt.utilities.on_demand_imports import _f90nml as f90nml
from yt.utilities.parallel_tools.parallel_analysis_interface import parallel_objects
from yt.utilities.physical_constants import kb, mp
from yt.utilities.physical_ratios import cm_per_mpc

Expand Down Expand Up @@ -548,19 +549,36 @@ def _initialize_oct_handler(self):
if self.ds._bbox is not None:
cpu_list = get_cpu_list(self.dataset, self.dataset._bbox)
else:
cpu_list = range(self.dataset["ncpu"])
cpu_list = list(range(self.dataset["ncpu"]))

if len(cpu_list) < self.comm.size:
mylog.error(
"The number of MPI tasks is larger than the number of files to read "
"on disk. This is not supported. Try running with at most "
f"{len(cpu_list)} MPI tasks."
)
self.comm.comm.Abort(2)

# Important note: we need to use the method="sequential"
# so that the first MPI task gets domains 1, 2, ..., n
# and the second task gets domains n+1, n+2, ..., 2n, etc.
self.domains = [
RAMSESDomainFile(self.dataset, i + 1)
for i in parallel_objects(cpu_list, method="sequential")
]

self.domains = [RAMSESDomainFile(self.dataset, i + 1) for i in cpu_list]
total_octs = sum(
dom.local_oct_count for dom in self.domains # + dom.ngridbound.sum()
)
max_level_loc = max(dom.max_level for dom in self.domains)

force_max_level, convention = self.ds._force_max_level
if convention == "yt":
force_max_level += self.ds.min_level + 1
self.max_level = min(
force_max_level, max(dom.max_level for dom in self.domains)
)
self.num_grids = total_octs

max_level_loc = min(force_max_level, max_level_loc)
self.max_level = self.comm.mpi_allreduce(max_level_loc, op="max")
self.num_grids = self.comm.mpi_allreduce(total_octs, op="sum")

def _detect_output_fields(self):
dsl = set()
Expand Down Expand Up @@ -622,6 +640,7 @@ def _chunk_io(self, dobj, cache=True, local_only=False):

def _initialize_level_stats(self):
levels = sum(dom.level_count for dom in self.domains)
levels = self.comm.mpi_allreduce(levels, op="sum")
desc = {"names": ["numcells", "level"], "formats": ["int64"] * 2}
max_level = self.dataset.min_level + self.dataset.max_level + 2
self.level_stats = blankRecordArray(desc, max_level)
Expand All @@ -644,6 +663,8 @@ def _get_particle_type_counts(self):
count = fh.local_particle_count
npart[fh.ptype] += count

npart = self.comm.par_combine_object(npart, op="sum")

return npart

def print_stats(self):
Expand Down
2 changes: 1 addition & 1 deletion yt/frontends/ramses/field_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def offset(self):
self.domain.domain_id,
self.parameters["nvar"],
self.domain.amr_header,
skip_len=nvars * 8,
Nskip=nvars * 8,
)

self._offset = offset
Expand Down
29 changes: 24 additions & 5 deletions yt/frontends/ramses/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ def _read_fluid_selection(self, chunks, selector, fields, size):

# Set of field types
ftypes = {f[0] for f in fields}

for chunk in chunks:
# Gather fields by type to minimize i/o operations
for ft in ftypes:
# Get all the fields of the same type
field_subs = list(filter(lambda f, ft=ft: f[0] == ft, fields))
field_subs = [field for field in fields if field[0] == ft]

# Loop over subsets
for subset in chunk.objs:
Expand Down Expand Up @@ -209,12 +210,17 @@ def _read_fluid_selection(self, chunks, selector, fields, size):
d.max(),
d.size,
)
tr[(ft, f)].append(d)
tr[ft, f].append(np.atleast_1d(d))

d = {}
for field in fields:
d[field] = np.concatenate(tr.pop(field))
tmp = tr.pop(field, None)
if tmp:
d[field] = np.concatenate(tmp)
else:
d[field] = np.empty(0, dtype="=f8")

return d
return self.ds.index.comm.par_combine_object(d, op="cat")

def _read_particle_coords(self, chunks, ptf):
pn = "particle_position_%s"
Expand Down Expand Up @@ -258,6 +264,7 @@ def _read_particle_fields(self, chunks, ptf, selector):
yield (ptype, field), data

else:
tr = defaultdict(list)
for chunk in chunks:
for subset in chunk.objs:
rv = self._read_particle_subset(subset, fields)
Expand All @@ -270,7 +277,19 @@ def _read_particle_fields(self, chunks, ptf, selector):
mask = []
for field in field_list:
data = np.asarray(rv.pop((ptype, field))[mask], "=f8")
yield (ptype, field), data
tr[ptype, field].append(np.atleast_1d(data))

d = {}
for ptype, field_list in sorted(ptf.items()):
for field in field_list:
tmp = tr.pop((ptype, field), None)
if tmp:
d[ptype, field] = np.concatenate(tmp)
else:
d[ptype, field] = np.empty(0, dtype="=f8")

d = self.ds.index.comm.par_combine_object(d, op="cat")
yield from d.items()

def _read_particle_subset(self, subset, fields):
"""Read the particle files."""
Expand Down
84 changes: 65 additions & 19 deletions yt/frontends/ramses/io_utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ ctypedef np.int32_t INT32_t
ctypedef np.int64_t INT64_t
ctypedef np.float64_t DOUBLE_t

cdef int INT32_SIZE = sizeof(np.int32_t)
cdef int INT64_SIZE = sizeof(np.int64_t)
cdef int DOUBLE_SIZE = sizeof(np.float64_t)

@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
Expand Down Expand Up @@ -76,19 +80,28 @@ def read_amr(FortranFile f, dict headers,
f.skip(skip_len)
# Note that we're adding *grids*, not individual cells.
if ilevel >= min_level:
n = oct_handler.add(icpu + 1, ilevel - min_level, pos[:ng, :],
count_boundary = 1)
n = oct_handler.add(
icpu + 1,
ilevel - min_level,
pos[:ng, :],
skip_boundary = 1,
count_boundary = 1,
)
if n > 0:
max_level = max(ilevel - min_level, max_level)

return max_level


cdef inline int skip_len(int Nskip, int record_len) noexcept nogil:
return Nskip * (record_len * DOUBLE_SIZE + INT64_SIZE)

@cython.cpow(True)
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
@cython.nonecheck(False)
cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int skip_len):
cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t nvar, dict headers, int Nskip):

cdef np.ndarray[np.int64_t, ndim=2] offset, level_count
cdef INT64_t ndim, twotondim, nlevelmax, n_levels, nboundary, ncpu, ncpu_and_bound
Expand All @@ -104,8 +117,8 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n
ncpu_and_bound = nboundary + ncpu
twotondim = 2**ndim

if skip_len == -1:
skip_len = twotondim * nvar
if Nskip == -1:
Nskip = twotondim * nvar

# It goes: level, CPU, 8-variable (1 oct)
offset = np.full((ncpu_and_bound, n_levels), -1, dtype=np.int64)
Expand All @@ -130,7 +143,7 @@ cpdef read_offset(FortranFile f, INT64_t min_level, INT64_t domain_id, INT64_t n
if ilevel >= min_level:
offset_view[icpu, ilevel - min_level] = f.tell()
level_count_view[icpu, ilevel - min_level] = <INT64_t> file_ncache
f.skip(skip_len)
f.seek(skip_len(Nskip, file_ncache), 1)

return offset, level_count

Expand All @@ -154,41 +167,74 @@ def fill_hydro(FortranFile f,
cdef dict tmp
cdef str field
cdef INT64_t twotondim
cdef int ilevel, icpu, ifield, nfields, nlevels, nc, ncpu_selected
cdef np.ndarray[np.uint8_t, ndim=1] mask
cdef int ilevel, icpu, nlevels, nc, ncpu_selected, nfields_selected
cdef int i, j, ii

twotondim = 2**ndim
nfields = len(all_fields)
nfields_selected = len(fields)

nlevels = offsets.shape[1]
ncpu_selected = len(cpu_enumerator)

mask = np.array([(field in fields) for field in all_fields], dtype=np.uint8)
cdef np.int64_t[::1] cpu_list = np.asarray(cpu_enumerator, dtype=np.int64)

cdef np.int64_t[::1] jumps = np.zeros(nfields_selected + 1, dtype=np.int64)
cdef int jump_len
cdef np.ndarray[np.float64_t, ndim=3] buffer

jump_len = 0
j = 0
for i, field in enumerate(all_fields):
if field in fields:
jumps[j] = jump_len
j += 1
jump_len = 0
else:
jump_len += 1
jumps[j] = jump_len

cdef int nc_largest = 0
# Loop over levels
for ilevel in range(nlevels):
# Loop over cpu domains
for icpu in cpu_enumerator:
for ii in range(ncpu_selected):
icpu = cpu_list[ii]
nc = level_count[icpu, ilevel]
if nc == 0:
continue
offset = offsets[icpu, ilevel]
if offset == -1:
continue
f.seek(offset)
tmp = {}
# Initialize temporary data container for io
# note: we use Fortran ordering to reflect the in-file ordering
for field in all_fields:
tmp[field] = np.empty((nc, twotondim), dtype="float64", order='F')
if nc > nc_largest:
nc_largest = nc
buffer = np.empty((nc, twotondim, nfields_selected), dtype="float64", order='F')

jump_len = 0
for i in range(twotondim):
# Read the selected fields
for ifield in range(nfields):
if not mask[ifield]:
f.skip()
else:
tmp[all_fields[ifield]][:, i] = f.read_vector('d') # i-th cell
for j in range(nfields_selected):
jump_len += jumps[j]
if jump_len > 0:
f.seek(skip_len(jump_len, nc), 1)
jump_len = 0
f.read_vector_inplace('d', <void*> &buffer[0, i, j])

jump_len += jumps[nfields_selected]

# In principle, we may be left with some fields to skip
# but since we're doing an absolute seek at the beginning of
# the loop on CPUs, we can spare one seek here
## if jump_len > 0:
## f.seek(skip_len(jump_len, nc), 1)

# Alias buffer into dictionary
tmp = {}
for i, field in enumerate(fields):
tmp[field] = buffer[:, :, i]

if ncpu_selected > 1:
oct_handler.fill_level_with_domain(
ilevel, levels, cell_inds, file_inds, domains, tr, tmp, domain=icpu+1)
Expand Down
35 changes: 35 additions & 0 deletions yt/geometry/oct_container.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,41 @@ cdef class OctreeContainer:
cdef public object fill_style
cdef public int max_level

cpdef int add(
self,
const int curdom,
const int curlevel,
const np.float64_t[:, ::1] pos,
int skip_boundary = ?,
int count_boundary = ?,
np.uint64_t[::1] levels = ?,
np.int64_t[::1] file_inds = ?,
np.uint8_t[::1] domain_inds = ?,
)

cpdef void fill_level(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
dict dest_fields,
dict source_fields,
np.int64_t offset = ?
)
cpdef int fill_level_with_domain(
self,
const int level,
const np.uint8_t[:] levels,
const np.uint8_t[:] cell_inds,
const np.int64_t[:] file_inds,
const np.int32_t[:] domains,
dict dest_fields,
dict source_fields,
const np.int32_t domain,
np.int64_t offset = ?
)

cdef class SparseOctreeContainer(OctreeContainer):
cdef OctKey *root_nodes
cdef void *tree_root
Expand Down
Loading
Loading