diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index a4c3a51c10..b185fc068e 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -1526,6 +1526,7 @@ SstStream CP_newStream() pthread_cond_init(&Stream->DataCondition, NULL); Stream->WriterTimestep = -1; // Filled in by ProvideTimestep Stream->ReaderTimestep = -1; // first beginstep will get us timestep 0 + Stream->CloseTimestepCount = (size_t)-1; Stream->LastReleasedTimestep = -1; Stream->DiscardPriorTimestep = -1; // Timesteps prior to this discarded/released upon arrival diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index 1777b700a1..0fdb77c499 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -181,6 +181,7 @@ struct _SstStream int WriterDefinitionsLocked; size_t NextRRDistribution; size_t LastDemandTimestep; + size_t CloseTimestepCount; /* rendezvous condition */ int FirstReaderCondition; diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 5b02434deb..9e393ed3ec 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -1180,6 +1180,8 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, } } +static void SendCloseMsgs(SstStream Stream); + static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry) { STREAM_ASSERT_LOCKED(Stream); @@ -1224,10 +1226,15 @@ static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry) free(Request); if (Stream->Readers[RequestingReader]->ReaderStatus == Established) { + Stream->LastDemandTimestep = Entry->Timestep; SendTimestepEntryToSingleReader( Stream, Entry, Stream->Readers[RequestingReader], RequestingReader); - Stream->LastDemandTimestep = Entry->Timestep; + if (Stream->LastDemandTimestep == Stream->CloseTimestepCount) + { + /* send if all timesteps have been send OnDemand */ + SendCloseMsgs(Stream); + } } else { @@ -1554,6 +1561,21 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream, QueueMaintenance(ParentStream); } +static void SendCloseMsgs(SstStream Stream) +{ + struct _WriterCloseMsg Msg; + STREAM_ASSERT_LOCKED(Stream); + memset(&Msg, 0, sizeof(Msg)); + Msg.FinalTimestep = Stream->LastProvidedTimestep; + CP_verbose( + Stream, PerStepVerbose, + "SstWriterClose, Sending Close at Timestep %d, one to each reader\n", + Msg.FinalTimestep); + + sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat, + &Msg, &Msg.RS_Stream); +} + /* On writer close: RemovePreciousTag on any timestep in queue @@ -1569,19 +1591,15 @@ On writer close: */ void SstWriterClose(SstStream Stream) { - struct _WriterCloseMsg Msg; struct timeval CloseTime, Diff; - memset(&Msg, 0, sizeof(Msg)); + Stream->CloseTimestepCount = Stream->WriterTimestep; STREAM_MUTEX_LOCK(Stream); - Msg.FinalTimestep = Stream->LastProvidedTimestep; - CP_verbose( - Stream, PerStepVerbose, - "SstWriterClose, Sending Close at Timestep %d, one to each reader\n", - Msg.FinalTimestep); - - sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat, - &Msg, &Msg.RS_Stream); - + if ((Stream->ConfigParams->StepDistributionMode != StepsOnDemand) || + (Stream->LastDemandTimestep == Stream->CloseTimestepCount)) + { + /* send if not OnDemand, or if all timesteps have been send OnDemand */ + SendCloseMsgs(Stream); + } UntagPreciousTimesteps(Stream); Stream->ConfigParams->ReserveQueueLimit = 0; QueueMaintenance(Stream); @@ -2659,8 +2677,14 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v, "reference count = %d\n", NextTS, List->ReferenceCount); + Stream->LastDemandTimestep = List->Timestep; SendTimestepEntryToSingleReader(Stream, List, CP_WSR_Stream, RequestingReader); + if (Stream->LastDemandTimestep == Stream->CloseTimestepCount) + { + /* send if all timesteps have been send OnDemand */ + SendCloseMsgs(Stream); + } STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream); return; } diff --git a/testing/adios2/engine/staging-common/CMakeLists.txt b/testing/adios2/engine/staging-common/CMakeLists.txt index 65064e04c1..bae5d32f28 100644 --- a/testing/adios2/engine/staging-common/CMakeLists.txt +++ b/testing/adios2/engine/staging-common/CMakeLists.txt @@ -36,6 +36,8 @@ foreach(helper TestLocalRead TestStructWrite TestStructRead + TestOnDemandWrite + TestOnDemandRead ) add_executable(${helper} ${helper}.cpp) @@ -144,7 +146,7 @@ set (ALL_SIMPLE_TESTS "") list (APPEND ALL_SIMPLE_TESTS ${SIMPLE_TESTS} ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_TESTS} ${SIMPLE_ZFP_TESTS}) set (SST_SPECIFIC_TESTS "") -list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3") +list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3;OnDemandSingle.1x1") if (ADIOS2_HAVE_MPI) list (APPEND SST_SPECIFIC_TESTS "2x3.SstRUDP;2x1.LocalMultiblock;5x3.LocalMultiblock;") endif() diff --git a/testing/adios2/engine/staging-common/TestOnDemandRead.cpp b/testing/adios2/engine/staging-common/TestOnDemandRead.cpp new file mode 100644 index 0000000000..8323d87c2d --- /dev/null +++ b/testing/adios2/engine/staging-common/TestOnDemandRead.cpp @@ -0,0 +1,169 @@ +#include +#include +#include +#include +#include + +#include +#include + +#include "TestData.h" + +#include "ParseArgs.h" + +class SstOnDemandReadTest : public ::testing::Test +{ +public: + SstOnDemandReadTest() = default; +}; + +#if ADIOS2_USE_MPI +MPI_Comm testComm; +#endif + +// ADIOS2 Sst read +TEST_F(SstOnDemandReadTest, ADIOS2SstOnDemandRead) +{ + int rank = 0; + int size = 1; + + int variablesSize = 100; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(testComm, &rank); + MPI_Comm_size(testComm, &size); +#endif + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(testComm); +#else + adios2::ADIOS adios; +#endif + try + { + adios2::IO sstIO = adios.DeclareIO("sstOnDemand"); + + sstIO.SetEngine(engine); + adios2::Engine sstReader = sstIO.Open(fname, adios2::Mode::Read); + + double get_time = 0; + const std::size_t my_start = Nx * rank; + const adios2::Dims pos_start{my_start}; + const adios2::Dims count{Nx}; + const adios2::Box sel(pos_start, count); + + auto start_step = std::chrono::steady_clock::now(); + int steps = 0; + std::vector myFloats(variablesSize * Nx); + while (sstReader.BeginStep() == adios2::StepStatus::OK) + { + for (int v = 0; v < variablesSize; ++v) + { + std::string namev("sstFloats"); + namev += std::to_string(v); + adios2::Variable sstFloats = + sstIO.InquireVariable(namev); + + sstFloats.SetSelection(sel); + auto start_get = std::chrono::steady_clock::now(); + sstReader.Get(sstFloats, myFloats.data() + (v * Nx)); + auto end_get = std::chrono::steady_clock::now(); + get_time += (end_get - start_get).count() / 1000; + // std::this_thread::sleep_for (std::chrono::seconds(1)); + } + sstReader.EndStep(); + steps += 1; +#ifdef DEBUG + size_t currentStep = sstReader.CurrentStep(); + for (unsigned int v = 0; v < variablesSize; ++v) + { + std::cout << name << ": Get step " << currentStep << " variable" + << v << " " << myFloats[v * Nx] << std::endl; + } +#endif + } + auto end_step = std::chrono::steady_clock::now(); + double total_time = + ((double)(end_step - start_step).count()) / (size * 1000.0); + get_time /= size; + + double global_get_sum = 0; + double global_sum = 0; +#if ADIOS2_USE_MPI + MPI_Reduce(&get_time, &global_get_sum, 1, MPI_DOUBLE, MPI_SUM, 0, + testComm); + MPI_Reduce(&total_time, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, + testComm); +#else + global_sum = total_time; + global_get_sum = get_time; +#endif + + // Time in microseconds + if (rank == 0) + { + std::cout << "SST,Read," << size << "," << Nx << "," + << variablesSize << "," << steps << "," << global_get_sum + << "," << global_sum << std::endl; + } + EXPECT_EQ(NSteps, steps); + sstReader.Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout << "IO System base failure exception, STOPPING PROGRAM " + "from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n"; + std::cout << e.what() << "\n"; + } +} + +//****************************************************************************** +// main +//****************************************************************************** + +int main(int argc, char **argv) +{ + int result; + ::testing::InitGoogleTest(&argc, argv); + ParseArgs(argc, argv); + +#if ADIOS2_USE_MPI + int provided; + int thread_support_level = (engine == "SST" || engine == "sst") + ? MPI_THREAD_MULTIPLE + : MPI_THREAD_SINGLE; + + // MPI_THREAD_MULTIPLE is only required if you enable the SST MPI_DP + MPI_Init_thread(nullptr, nullptr, thread_support_level, &provided); + + int key; + MPI_Comm_rank(MPI_COMM_WORLD, &key); + + const unsigned int color = 2; + MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm); +#endif + + result = RUN_ALL_TESTS(); + +#if ADIOS2_USE_MPI +#ifdef CRAY_MPICH_VERSION + MPI_Barrier(MPI_COMM_WORLD); +#else + MPI_Finalize(); +#endif +#endif + + return result; +} diff --git a/testing/adios2/engine/staging-common/TestOnDemandWrite.cpp b/testing/adios2/engine/staging-common/TestOnDemandWrite.cpp new file mode 100644 index 0000000000..ad32a4965a --- /dev/null +++ b/testing/adios2/engine/staging-common/TestOnDemandWrite.cpp @@ -0,0 +1,171 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "TestData.h" + +#include "ParseArgs.h" + +std::vector create_random_data(int n) +{ + std::vector v(n); + + std::generate(begin(v), end(v), []() { return ((float)(rand() % 100)); }); + return v; +} + +class SstOnDemandWriteTest : public ::testing::Test +{ +public: + SstOnDemandWriteTest() = default; +}; + +#if ADIOS2_USE_MPI +MPI_Comm testComm; +#endif + +// ADIOS2 Sst Write +TEST_F(SstOnDemandWriteTest, ADIOS2SstOnDemandWrite) +{ + int rank = 0; + int size = 1; + + int variablesSize = 100; + +#if ADIOS2_USE_MPI + MPI_Comm_rank(testComm, &rank); + MPI_Comm_size(testComm, &size); +#endif + +#if ADIOS2_USE_MPI + adios2::ADIOS adios(testComm); +#else + adios2::ADIOS adios; +#endif + + auto myFloats = create_random_data((int)Nx * variablesSize); + + try + { + adios2::IO sstIO = adios.DeclareIO("sstOnDemand"); + + sstIO.SetEngine(engine); + sstIO.SetParameters({{"AlwaysProvideLatestTimestep", "true"}}); + std::vector> sstFloats(variablesSize); + for (int v = 0; v < variablesSize; ++v) + { + std::string namev("sstFloats"); + namev += std::to_string(v); + sstFloats[v] = sstIO.DefineVariable(namev, {size * Nx}, + {rank * Nx}, {Nx}); + } + + adios2::Engine sstWriter = sstIO.Open(fname, adios2::Mode::Write); + double put_time = 0; + auto start_step = std::chrono::steady_clock::now(); + for (int timeStep = 0; timeStep < NSteps; ++timeStep) + { + sstWriter.BeginStep(); + for (int v = 0; v < variablesSize; ++v) + { + myFloats[v * Nx] = (float)v + timeStep * variablesSize; + auto start_put = std::chrono::steady_clock::now(); + sstWriter.Put(sstFloats[v], myFloats.data() + v * Nx); + auto end_put = std::chrono::steady_clock::now(); + put_time += (end_put - start_put).count() / 1000; + // std::this_thread::sleep_for (std::chrono::seconds(10)); +#ifdef DEBUG + std::cout << fname << ": Put step " << timeStep << " variable" + << v << " " << myFloats[v * Nx] << std::endl; +#endif + } + sstWriter.EndStep(); + } + auto end_step = std::chrono::steady_clock::now(); + double total_time = + ((double)(end_step - start_step).count()) / (size * 1000.0); + + double global_put_sum; + double global_sum; +#if ADIOS2_USE_MPI + MPI_Reduce(&put_time, &global_put_sum, 1, MPI_DOUBLE, MPI_SUM, 0, + testComm); + MPI_Reduce(&total_time, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, + testComm); +#else + global_sum = total_time; + global_put_sum = put_time; +#endif + // Time in microseconds + if (rank == 0) + std::cout << "SST,Write," << size << "," << Nx << "," + << variablesSize << "," << NSteps << "," + << global_put_sum / size << "," << global_sum / size + << std::endl; + sstWriter.Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout + << "IO System base failure exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n"; + std::cout << e.what() << "\n"; + } +} + +int main(int argc, char **argv) +{ + int result; + ::testing::InitGoogleTest(&argc, argv); + + DelayMS = 0; // zero for common writer + + ParseArgs(argc, argv); + +#if ADIOS2_USE_MPI + int provided; + int thread_support_level = (engine == "SST" || engine == "sst") + ? MPI_THREAD_MULTIPLE + : MPI_THREAD_SINGLE; + + // MPI_THREAD_MULTIPLE is only required if you enable the SST MPI_DP + MPI_Init_thread(nullptr, nullptr, thread_support_level, &provided); + + int key; + MPI_Comm_rank(MPI_COMM_WORLD, &key); + + const unsigned int color = 1; + MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm); +#endif + + result = RUN_ALL_TESTS(); + +#if ADIOS2_USE_MPI +#ifdef CRAY_MPICH_VERSION + MPI_Barrier(MPI_COMM_WORLD); +#else + MPI_Finalize(); +#endif +#endif + + return result; +} diff --git a/testing/adios2/engine/staging-common/TestSupp.cmake b/testing/adios2/engine/staging-common/TestSupp.cmake index 9133714083..efadc464d0 100644 --- a/testing/adios2/engine/staging-common/TestSupp.cmake +++ b/testing/adios2/engine/staging-common/TestSupp.cmake @@ -168,6 +168,7 @@ set (PreciousTimestepDiscard.3x2_TIMEOUT "300") # Writer StepDistributionModes. Here we run the writer and three clients set (AllToAllDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS") set (RoundRobinDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS --warg=--round_robin --rarg=--round_robin") +set (OnDemandSingle.1x1_CMD "run_test.py.$ -w $ -r $") set (OnDemandDistribution.1x1x3_CMD "run_test.py.$ --test_protocol multi_client -nw 1 -nr 1 -w $ -r $ --warg=RendezvousReaderCount=3,WENGINE_PARAMS --warg=--on_demand --rarg=--on_demand --warg=--num_steps --warg=20") # Readers using BeginStep with timeout. Here we run the writer with a longer delay to make the reader timeout