Skip to content

Commit

Permalink
Python API: Adios() new argument 'config_file', Stream() new argument… (
Browse files Browse the repository at this point in the history
#3984) (#3999)

* Python API: Adios() new argument 'config_file', Stream() new argument 'io_name' that must go together with 'config_file'
Gray-Scott example's python scripts fixed to use the new Stream() object

* Make 'comm' a named optional argument for Stream.

---------

Co-authored-by: Greg Eisenhauer <eisen@cc.gatech.edu>
  • Loading branch information
pnorbert and eisenhauer committed Jan 12, 2024
1 parent 0fa6e62 commit 89a5817
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 56 deletions.
3 changes: 2 additions & 1 deletion examples/simulations/gray-scott/plot/decomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def Locate(rank, nproc, datasize):
class MPISetup(object):
readargs = []
size = 1
rank = {"app": 0, "x": 0, "y": 0}
rank = {"app": 0, "x": 0, "y": 0, "z": 0}

def __init__(self, args, appID):

Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(self, args, appID):
raise ValueError("nx must = 1 without MPI")
if self.ny != 1:
raise ValueError("ny must = 1 without MPI")
self.comm_app = None

# self.readargs.extend([args.xmlfile, "heat"])

Expand Down
37 changes: 18 additions & 19 deletions examples/simulations/gray-scott/plot/gsplot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
import adios2 # pylint: disable=import-error
from adios2 import Stream # pylint: disable=import-error
import argparse
import numpy as np # pylint: disable=import-error
import matplotlib.pyplot as plt # pylint: disable=import-error
Expand Down Expand Up @@ -98,21 +98,14 @@ def Plot2D(plane_direction, data, args, fullshape, step, fontsize):
plt.show()
plt.pause(displaysec)
elif args.outfile.endswith(".bp"):
if step == 0:
global adios
global ioWriter
global var
global writer
adios = adios2.ADIOS(mpi.comm_app)
ioWriter = adios.DeclareIO("VizOutput")
var = ioWriter.DefineVariable(
args.varname, data.shape, [0, 0], data.shape, adios2.ConstantDims, data
)
writer = ioWriter.Open(args.outfile, adios2.Mode.Write)

writer.BeginStep()
writer.Put(var, data, adios2.Mode.Sync)
writer.EndStep()
global writer
# print("plot to file, step = ", step)
# if step == 0:
# writer = Stream(args.outfile, "w")
#
writer.begin_step()
writer.write(args.varname, data, data.shape, [0, 0], data.shape)
writer.end_step()
else:
imgfile = args.outfile + "{0:0>5}".format(step) + "_" + plane_direction + ".png"
fig.savefig(imgfile)
Expand All @@ -139,14 +132,18 @@ def read_data(args, fr, start_coord, size_dims):
myrank = mpi.rank["app"]

# Read the data from this object
fr = adios2.open(args.instream, "r", mpi.comm_app, "adios2.xml", "SimulationOutput")
# vars_info = fr.availablevariables()
fr = Stream(args.instream, "r", comm=mpi.comm_app, config_file="adios2.xml",
io_name="SimulationOutput")

if args.outfile.endswith(".bp"):
global writer
writer = Stream(args.outfile, "w")

# Get the ADIOS selections -- equally partition the data if parallelization is requested

# Read through the steps, one at a time
plot_step = 0
for fr_step in fr:
for fr_step in fr.steps():
# if fr_step.current_step()
start, size, fullshape = mpi.Partition_3D_3D(fr, args)
cur_step = fr_step.current_step()
Expand Down Expand Up @@ -185,3 +182,5 @@ def read_data(args, fr, start_coord, size_dims):
plot_step = plot_step + 1

fr.close()
if args.outfile.endswith(".bp"):
writer.close()
23 changes: 9 additions & 14 deletions examples/simulations/gray-scott/plot/pdfplot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
import adios2 # pylint: disable=import-error
from adios2 import Stream # pylint: disable=import-error
import argparse
import numpy as np # pylint: disable=import-error
import matplotlib.pyplot as plt # pylint: disable=import-error
Expand Down Expand Up @@ -85,16 +85,12 @@ def read_data(args, fr, start_coord, size_dims):
myrank = mpi.rank["app"]

# Read the data from this object
if not args.nompi:
fr = adios2.open(
args.instream, "r", mpi.comm_app, "adios2.xml", "PDFAnalysisOutput"
)
else:
fr = adios2.open(args.instream, "r", "adios2.xml", "PDFAnalysisOutput")
fr = Stream(args.instream, "r", comm=mpi.comm_app, config_file="adios2.xml",
io_name="PDFAnalysisOutput")

# Read through the steps, one at a time
plot_step = 0
for fr_step in fr:
for fr_step in fr.steps():
cur_step = fr_step.current_step()
vars_info = fr_step.available_variables()
# print (vars_info)
Expand All @@ -103,13 +99,12 @@ def read_data(args, fr, start_coord, size_dims):
shape2_str = vars_info[pdfvar]["Shape"].split(",")
shape2 = list(map(int, shape2_str))

start = np.zeros(2, dtype=np.int64)
count = np.zeros(2, dtype=np.int64)
# Equally partition the PDF arrays among readers
start[0], count[0] = decomp.Locate(myrank, mpi.size, shape2[0])
start[1], count[1] = (0, shape2[1])
start_bins = np.array([0], dtype=np.int64)
count_bins = np.array([shape2[1]], dtype=np.int64)
s0, c0 = decomp.Locate(myrank, mpi.size, shape2[0])
start = [s0, 0]
count = [c0, shape2[1]]
start_bins = [0]
count_bins = [shape2[1]]

# print("Rank {0} reads {1} slices from offset {2}".format(myrank, count[0], start[0]))

Expand Down
14 changes: 10 additions & 4 deletions python/adios2/adios.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
class Adios:
"""High level representation of the ADIOS class in the adios2.bindings"""

def __init__(self, comm=None):
def __init__(self, config_file=None, comm=None):
if comm and not bindings.is_built_with_mpi:
raise RuntimeError("Cannot use MPI since ADIOS2 was built without MPI support")

if comm:
self.impl = bindings.ADIOS(comm)
if config_file:
if comm:
self.impl = bindings.ADIOS(config_file, comm)
else:
self.impl = bindings.ADIOS(config_file)
else:
self.impl = bindings.ADIOS()
if comm:
self.impl = bindings.ADIOS(comm)
else:
self.impl = bindings.ADIOS()

@property
def impl(self):
Expand Down
11 changes: 9 additions & 2 deletions python/adios2/file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ class FileReader(Stream):
def __repr__(self):
return f"<adios.file named {self._io_name}>"

def __init__(self, path, comm=None, engine_type="BPStream", config_file=None):
super().__init__(path, "rra", comm, engine_type, config_file)
def __init__(self, path, *, comm=None, engine_type=None, config_file=None, io_name=None):
super().__init__(
path,
"rra",
comm=comm,
engine_type=engine_type,
config_file=config_file,
io_name=io_name,
)

def variables(self):
"""Returns the list of variables contained in the opened file"""
Expand Down
22 changes: 20 additions & 2 deletions python/adios2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,42 @@ def type_adios_to_numpy(name):
class Stream:
"""High level implementation of the Stream class from the core API"""

def __init__(self, path, mode="r", comm=None, engine_type="BPStream", config_file=None):
def __init__(
self, path, mode="r", *, comm=None, engine_type=None, config_file=None, io_name=None
):

# pylint: disable=R0912 # Too many branches
if comm and not bindings.is_built_with_mpi:
raise RuntimeError("Cannot use MPI since ADIOS2 was built without MPI support")

if config_file and engine_type:
raise RuntimeError("Arguments 'engine_type' and 'config_file' cannot be used together")

if config_file and not io_name:
raise RuntimeError("Argument 'io_name' is required when using 'config_file'")

if not engine_type:
engine_type = "File"

# pylint: disable=E1121
if config_file:
if comm:
self._adios = Adios(config_file, comm)
else:
self._adios = Adios(config_file)
self._io_name = io_name

else:
if comm:
self._adios = Adios(comm)
else:
self._adios = Adios()
self._io_name = f"stream:{path}:engine_type:{engine_type}:mode:{mode}"

# pylint: enable=E1121
self._io_name = f"stream:{path}:engine_type:{engine_type}:mode:{mode}"
self._io = self._adios.declare_io(self._io_name)
if not config_file:
self._io.set_engine(engine_type)

if mode == "r":
self._mode = bindings.Mode.Read
Expand Down
4 changes: 2 additions & 2 deletions testing/adios2/python/TestBPChangingShapeHighLevelAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
count = [[nx[0]], [nx[1]]]

# Write different sized arrays as separate steps
with Stream("out.bp", "w", comm) as s:
with Stream("out.bp", "w", comm=comm) as s:
s.begin_step()
s.write("z", data[0], shape[0], start[0], count[0])
s.end_step()
Expand All @@ -34,7 +34,7 @@
s.end_step()

# Read back arrays
with Stream("out.bp", "r", comm) as s:
with Stream("out.bp", "r", comm=comm) as s:
for step in s.steps():
shape_z = int(step.available_variables()["z"]["Shape"])
print(shape_z)
Expand Down
4 changes: 2 additions & 2 deletions testing/adios2/python/TestBPPNGHighLevelAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def compress_png(compression_level):
count2D = [Nx, Ny]

# writer
with Stream(fname, "w", comm) as s:
with Stream(fname, "w", comm=comm) as s:
for step in s.steps(NSteps):
s.write(
"u8",
Expand Down Expand Up @@ -89,7 +89,7 @@ def compress_png(compression_level):
)

# reader
with Stream(fname, "r", comm) as s:
with Stream(fname, "r", comm=comm) as s:
for step in s.steps():
in_u8s = step.read("u8", start3D, count3D)
in_u32s = step.read("u32", start2D, count2D)
Expand Down
8 changes: 4 additions & 4 deletions testing/adios2/python/TestBPWriteReadString.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ def test_write_read_string_high_api(self):
bpFilename = "string_test_highAPI.bp"
varname = "mystringvar"

with Stream(bpFilename, "w", comm) as s:
with Stream(bpFilename, "w", comm=comm) as s:
for step in s.steps(N_STEPS):
s.write(varname, theString + str(step.current_step()))

with Stream(bpFilename, "r", comm) as s:
with Stream(bpFilename, "r", comm=comm) as s:
for _ in s.steps():
step = s.current_step()
result = s.read(varname)
Expand All @@ -34,13 +34,13 @@ def test_write_read_string_high_api(self):
def test_read_strings_all_steps(self):
comm = MPI.COMM_WORLD
fileName = "string_test_all.bp"
with Stream(fileName, "w", comm) as s:
with Stream(fileName, "w", comm=comm) as s:
i = 0
for _ in s.steps(N_STEPS):
s.write("string_variable", "written {}".format(i))
i += 1

# with Stream(fileName, "rra", comm) as s:
# with Stream(fileName, "rra", comm = comm) as s:
# n = s.num_steps()
# name = "string_variable"
# result = s.read_string(name, 0, n)
Expand Down
4 changes: 2 additions & 2 deletions testing/adios2/python/TestBPWriteTypesHighLevelAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
count = [nx]

# Writer
with Stream("types_np.bp", "w", comm) as s:
with Stream("types_np.bp", "w", comm=comm) as s:
for step in s.steps(5):
data.update(rank, step.current_step(), size)
s.write("rank", np.array(rank), shape=[LocalValueDim])
Expand Down Expand Up @@ -100,7 +100,7 @@
# Reader
data = SmallTestData()

with Stream("types_np.bp", "r", comm) as fr:
with Stream("types_np.bp", "r", comm=comm) as fr:
# file only
assert fr.num_steps() == 5

Expand Down
4 changes: 2 additions & 2 deletions testing/adios2/python/TestBPWriteTypesHighLevelAPILocal.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def check_array(np1, np2, hint):
count = [nx]

# Writer
with Stream("types_np_local.bp", "w", comm) as s:
with Stream("types_np_local.bp", "w", comm=comm) as s:
for step in s.steps(5):
data.update(rank, step.current_step(), size)
s.write("varI8", data.i8, shape, start, count)
Expand All @@ -50,7 +50,7 @@ def check_array(np1, np2, hint):
# Reader
data = SmallTestData()

with Stream("types_np_local.bp", "r", comm) as s:
with Stream("types_np_local.bp", "r", comm=comm) as s:
for fr_step in s.steps():
step = fr_step.current_step()

Expand Down
4 changes: 2 additions & 2 deletions testing/adios2/python/TestBPZfpHighLevelAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def CompressZfp2D(rate):
count = [Ny, Nx]

# writer
with Stream(fname, "w", comm) as s:
with Stream(fname, "w", comm=comm) as s:
for _ in s.steps(NSteps):
s.write("r32", r32s, shape, start, count, [("zfp", {"accuracy": str(rate)})])
s.write("r64", r64s, shape, start, count, [("zfp", {"accuracy": str(rate)})])

# reader
with Stream(fname, "r", comm) as s:
with Stream(fname, "r", comm=comm) as s:
for _ in s.steps():
in_r32s = s.read("r32", start, count)
in_r64s = s.read("r64", start, count)
Expand Down

0 comments on commit 89a5817

Please sign in to comment.