Skip to content

Commit

Permalink
Merge pull request #993 from eisenhauer/SstLastTimeoutDiscard
Browse files Browse the repository at this point in the history
Support discard on writer side, LastAvailable and timeout on reader side
  • Loading branch information
eisenhauer committed Nov 10, 2018
2 parents 6577a33 + 5f9cf91 commit 24274c6
Show file tree
Hide file tree
Showing 37 changed files with 612 additions and 105 deletions.
3 changes: 2 additions & 1 deletion bindings/CXX11/cxx11/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class Engine
* @param timeoutSeconds
* @return current step status
*/
StepStatus BeginStep(const StepMode mode, const float timeoutSeconds = 0.f);
StepStatus BeginStep(const StepMode mode,
const float timeoutSeconds = -1.f);

/**
* Inspect current logical step
Expand Down
6 changes: 3 additions & 3 deletions bindings/Fortran/modules/adios2_engine_begin_step_mod.f90
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ subroutine adios2_begin_step_mode(engine, step_mode, ierr)
!local
integer status

call adios2_begin_step_f2c(engine%f2c, step_mode, 0._4, status, ierr)
call adios2_begin_step_f2c(engine%f2c, step_mode, -1._4, status, ierr)

end subroutine

Expand All @@ -55,11 +55,11 @@ subroutine adios2_begin_step_default(engine, ierr)

if( engine%mode == adios2_mode_read ) then
call adios2_begin_step_f2c(engine%f2c, &
adios2_step_mode_next_available, 0._4, &
adios2_step_mode_next_available, -1._4, &
status, ierr)
else
call adios2_begin_step_f2c(engine%f2c, &
adios2_step_mode_append, 0._4, &
adios2_step_mode_append, -1.0_4, &
status, ierr)
end if

Expand Down
3 changes: 2 additions & 1 deletion bindings/Python/py11Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class Engine

~Engine() = default;

StepStatus BeginStep(const StepMode mode, const float timeoutSeconds = 0.f);
StepStatus BeginStep(const StepMode mode,
const float timeoutSeconds = -1.f);

void Put(core::VariableBase *variable, const pybind11::array &array,
const Mode launch = Mode::Deferred);
Expand Down
2 changes: 1 addition & 1 deletion bindings/Python/py11glue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ PYBIND11_MODULE(adios2, m)
pybind11::class_<adios2::py11::Engine>(m, "Engine")
.def("BeginStep", &adios2::py11::Engine::BeginStep,
pybind11::arg("mode") = adios2::StepMode::NextAvailable,
pybind11::arg("timeoutSeconds") = 0.f)
pybind11::arg("timeoutSeconds") = -1.f)

.def("Put", (void (adios2::py11::Engine::*)(
adios2::core::VariableBase *, const pybind11::array &,
Expand Down
2 changes: 1 addition & 1 deletion examples/heatTransfer/read/heatRead.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ int main(int argc, char *argv[])
while (true)
{
adios2::StepStatus status =
reader.BeginStep(adios2::StepMode::NextAvailable, 0.0f);
reader.BeginStep(adios2::StepMode::NextAvailable);
if (status != adios2::StepStatus::OK)
{
break;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ StepStatus Engine::BeginStep()
{
if (m_OpenMode == Mode::Read)
{
return BeginStep(StepMode::NextAvailable,
std::numeric_limits<float>::max());
return BeginStep(StepMode::NextAvailable, -1.0);
}
else
{
return BeginStep(StepMode::Append, std::numeric_limits<float>::max());
return BeginStep(StepMode::Append, -1.0);
}
}

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ class Engine
* @param timeoutSeconds (not yet implemented)
* @return current step status
*/
virtual StepStatus
BeginStep(StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max());
virtual StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0);

/**
* Returns current step information for each engine.
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/bp3/BP3Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class BP3Reader : public Engine

virtual ~BP3Reader() = default;

StepStatus BeginStep(
StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = -1.0) final;

size_t CurrentStep() const final;

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/bp3/BP3Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class BP3Writer : public core::Engine

~BP3Writer();

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
size_t CurrentStep() const final;
void PerformPuts() final;
void EndStep() final;
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ StepStatus DataManReader::BeginStep(StepMode stepMode,
// timeout == std::numeric_limits<float>::max() means there is no
// timeout, and it should block
// forever until it receives something.
if (timeoutSeconds != std::numeric_limits<float>::max())
if (timeoutSeconds >= 0.0)
{
if (duration.count() > timeoutSeconds)
{
std::cout << "Dataman Reader timeing out" << std::endl;
return StepStatus::NotReady;
}
}
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/dataman/DataManReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ class DataManReader : public DataManCommon
DataManReader(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm);
virtual ~DataManReader();
StepStatus BeginStep(
StepMode stepMode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode stepMode,
const float timeoutSeconds = -1.0) final;
size_t CurrentStep() const final;
void PerformGets() final;
void EndStep() final;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ class DataManWriter : public DataManCommon
MPI_Comm mpiComm);
~DataManWriter() = default;

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
size_t CurrentStep() const;
void PerformPuts() final;
void EndStep() final;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/hdf5/HDF5ReaderP.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ class HDF5ReaderP : public Engine

bool IsValid();

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
size_t CurrentStep() const final;
void EndStep() final;

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/hdf5/HDF5WriterP.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ class HDF5WriterP : public Engine

~HDF5WriterP();

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
void EndStep() final;

void PerformPuts() final;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/insitumpi/InSituMPIReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ class InSituMPIReader : public Engine
MPI_Comm mpiComm);

~InSituMPIReader();
StepStatus BeginStep(
StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = -1.0) final;
void PerformGets() final;
size_t CurrentStep() const final;
void EndStep() final;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/insitumpi/InSituMPIWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ class InSituMPIWriter : public Engine

~InSituMPIWriter();

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
void PerformPuts() final;
void EndStep() final;

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/skeleton/SkeletonReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ class SkeletonReader : public Engine
MPI_Comm mpiComm);

~SkeletonReader();
StepStatus BeginStep(
StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode = StepMode::NextAvailable,
const float timeoutSeconds = -1.0) final;
void PerformGets() final;
size_t CurrentStep() const final;
void EndStep() final;
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/skeleton/SkeletonWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ class SkeletonWriter : public Engine

~SkeletonWriter() = default;

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
size_t CurrentStep() const final;
void PerformPuts() final;
void EndStep() final;
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ StepStatus SstReader::BeginStep(StepMode mode, const float timeout_sec)
{
return StepStatus::EndOfStream;
}
if (result == SstTimeout)
{
return StepStatus::NotReady;
}

if (result != SstSuccess)
{
return StepStatus::OtherError;
Expand Down
4 changes: 1 addition & 3 deletions source/adios2/engine/sst/SstReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ class SstReader : public Engine
virtual ~SstReader();

StepStatus BeginStep();
StepStatus
BeginStep(StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max());
StepStatus BeginStep(StepMode mode, const float timeoutSeconds = -1.0);
size_t CurrentStep() const final;
void EndStep();
void PerformGets();
Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/sst/SstWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ class SstWriter : public Engine

virtual ~SstWriter();

StepStatus BeginStep(
StepMode mode,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStep(StepMode mode,
const float timeoutSeconds = -1.0) final;
void PerformPuts() final;
void EndStep() final;
void Flush(const int transportIndex = -1) final;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,8 @@ SstStream CP_newStream()
pthread_cond_init(&Stream->DataCondition, NULL);
Stream->WriterTimestep = -1; // Filled in by ProvideTimestep
Stream->ReaderTimestep = -1; // first beginstep will get us timestep 0
Stream->DiscardPriorTimestep =
-1; // Timesteps prior to this discarded/released upon arrival
if (getenv("SstVerbose"))
{
Stream->Verbose = 1;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ struct _SstStream
SstFullMetadata CurrentMetadata;
struct _SstParams *WriterConfigParams;
void *ParamsBlock;
long DiscardPriorTimestep; /* timesteps numerically less than this will be
discarded with prejudice */

/* reader side marshal info */
FFSContext ReaderFFSContext;
Expand Down

0 comments on commit 24274c6

Please sign in to comment.