Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in OnDemand timestep distribution #3355

Merged
merged 1 commit into from Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_common.c
Expand Up @@ -1526,6 +1526,7 @@ SstStream CP_newStream()
pthread_cond_init(&Stream->DataCondition, NULL);
Stream->WriterTimestep = -1; // Filled in by ProvideTimestep
Stream->ReaderTimestep = -1; // first beginstep will get us timestep 0
Stream->CloseTimestepCount = (size_t)-1;
Stream->LastReleasedTimestep = -1;
Stream->DiscardPriorTimestep =
-1; // Timesteps prior to this discarded/released upon arrival
Expand Down
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Expand Up @@ -181,6 +181,7 @@ struct _SstStream
int WriterDefinitionsLocked;
size_t NextRRDistribution;
size_t LastDemandTimestep;
size_t CloseTimestepCount;

/* rendezvous condition */
int FirstReaderCondition;
Expand Down
48 changes: 36 additions & 12 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Expand Up @@ -1180,6 +1180,8 @@ static void SendTimestepEntryToSingleReader(SstStream Stream,
}
}

static void SendCloseMsgs(SstStream Stream);

static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry)
{
STREAM_ASSERT_LOCKED(Stream);
Expand Down Expand Up @@ -1224,10 +1226,15 @@ static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry)
free(Request);
if (Stream->Readers[RequestingReader]->ReaderStatus == Established)
{
Stream->LastDemandTimestep = Entry->Timestep;
SendTimestepEntryToSingleReader(
Stream, Entry, Stream->Readers[RequestingReader],
RequestingReader);
Stream->LastDemandTimestep = Entry->Timestep;
if (Stream->LastDemandTimestep == Stream->CloseTimestepCount)
{
/* send if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
}
else
{
Expand Down Expand Up @@ -1554,6 +1561,21 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
QueueMaintenance(ParentStream);
}

static void SendCloseMsgs(SstStream Stream)
{
struct _WriterCloseMsg Msg;
STREAM_ASSERT_LOCKED(Stream);
memset(&Msg, 0, sizeof(Msg));
Msg.FinalTimestep = Stream->LastProvidedTimestep;
CP_verbose(
Stream, PerStepVerbose,
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);
}

/*
On writer close:
RemovePreciousTag on any timestep in queue
Expand All @@ -1569,19 +1591,15 @@ On writer close:
*/
void SstWriterClose(SstStream Stream)
{
struct _WriterCloseMsg Msg;
struct timeval CloseTime, Diff;
memset(&Msg, 0, sizeof(Msg));
Stream->CloseTimestepCount = Stream->WriterTimestep;
STREAM_MUTEX_LOCK(Stream);
Msg.FinalTimestep = Stream->LastProvidedTimestep;
CP_verbose(
Stream, PerStepVerbose,
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);

if ((Stream->ConfigParams->StepDistributionMode != StepsOnDemand) ||
(Stream->LastDemandTimestep == Stream->CloseTimestepCount))
{
/* send if not OnDemand, or if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
UntagPreciousTimesteps(Stream);
Stream->ConfigParams->ReserveQueueLimit = 0;
QueueMaintenance(Stream);
Expand Down Expand Up @@ -2659,8 +2677,14 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v,
"reference count = %d\n",
NextTS, List->ReferenceCount);

Stream->LastDemandTimestep = List->Timestep;
SendTimestepEntryToSingleReader(Stream, List, CP_WSR_Stream,
RequestingReader);
if (Stream->LastDemandTimestep == Stream->CloseTimestepCount)
{
/* send if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion testing/adios2/engine/staging-common/CMakeLists.txt
Expand Up @@ -36,6 +36,8 @@ foreach(helper
TestLocalRead
TestStructWrite
TestStructRead
TestOnDemandWrite
TestOnDemandRead
)
add_executable(${helper} ${helper}.cpp)

Expand Down Expand Up @@ -144,7 +146,7 @@ set (ALL_SIMPLE_TESTS "")
list (APPEND ALL_SIMPLE_TESTS ${SIMPLE_TESTS} ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_TESTS} ${SIMPLE_ZFP_TESTS})

set (SST_SPECIFIC_TESTS "")
list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3")
list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3;OnDemandSingle.1x1")
if (ADIOS2_HAVE_MPI)
list (APPEND SST_SPECIFIC_TESTS "2x3.SstRUDP;2x1.LocalMultiblock;5x3.LocalMultiblock;")
endif()
Expand Down
169 changes: 169 additions & 0 deletions testing/adios2/engine/staging-common/TestOnDemandRead.cpp
@@ -0,0 +1,169 @@
#include <chrono>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>

#include <adios2.h>
#include <gtest/gtest.h>

#include "TestData.h"

#include "ParseArgs.h"

class SstOnDemandReadTest : public ::testing::Test
{
public:
SstOnDemandReadTest() = default;
};

#if ADIOS2_USE_MPI
MPI_Comm testComm;
#endif

// ADIOS2 Sst read
TEST_F(SstOnDemandReadTest, ADIOS2SstOnDemandRead)
{
int rank = 0;
int size = 1;

int variablesSize = 100;

#if ADIOS2_USE_MPI
MPI_Comm_rank(testComm, &rank);
MPI_Comm_size(testComm, &size);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(testComm);
#else
adios2::ADIOS adios;
#endif
try
{
adios2::IO sstIO = adios.DeclareIO("sstOnDemand");

sstIO.SetEngine(engine);
adios2::Engine sstReader = sstIO.Open(fname, adios2::Mode::Read);

double get_time = 0;
const std::size_t my_start = Nx * rank;
const adios2::Dims pos_start{my_start};
const adios2::Dims count{Nx};
const adios2::Box<adios2::Dims> sel(pos_start, count);

auto start_step = std::chrono::steady_clock::now();
int steps = 0;
std::vector<float> myFloats(variablesSize * Nx);
while (sstReader.BeginStep() == adios2::StepStatus::OK)
{
for (int v = 0; v < variablesSize; ++v)
{
std::string namev("sstFloats");
namev += std::to_string(v);
adios2::Variable<float> sstFloats =
sstIO.InquireVariable<float>(namev);

sstFloats.SetSelection(sel);
auto start_get = std::chrono::steady_clock::now();
sstReader.Get(sstFloats, myFloats.data() + (v * Nx));
auto end_get = std::chrono::steady_clock::now();
get_time += (end_get - start_get).count() / 1000;
// std::this_thread::sleep_for (std::chrono::seconds(1));
}
sstReader.EndStep();
steps += 1;
#ifdef DEBUG
size_t currentStep = sstReader.CurrentStep();
for (unsigned int v = 0; v < variablesSize; ++v)
{
std::cout << name << ": Get step " << currentStep << " variable"
<< v << " " << myFloats[v * Nx] << std::endl;
}
#endif
}
auto end_step = std::chrono::steady_clock::now();
double total_time =
((double)(end_step - start_step).count()) / (size * 1000.0);
get_time /= size;

double global_get_sum = 0;
double global_sum = 0;
#if ADIOS2_USE_MPI
MPI_Reduce(&get_time, &global_get_sum, 1, MPI_DOUBLE, MPI_SUM, 0,
testComm);
MPI_Reduce(&total_time, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0,
testComm);
#else
global_sum = total_time;
global_get_sum = get_time;
#endif

// Time in microseconds
if (rank == 0)
{
std::cout << "SST,Read," << size << "," << Nx << ","
<< variablesSize << "," << steps << "," << global_get_sum
<< "," << global_sum << std::endl;
}
EXPECT_EQ(NSteps, steps);
sstReader.Close();
}
catch (std::invalid_argument &e)
{
std::cout << "Invalid argument exception, STOPPING PROGRAM from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::ios_base::failure &e)
{
std::cout << "IO System base failure exception, STOPPING PROGRAM "
"from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::exception &e)
{
std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n";
std::cout << e.what() << "\n";
}
}

//******************************************************************************
// main
//******************************************************************************

int main(int argc, char **argv)
{
int result;
::testing::InitGoogleTest(&argc, argv);
ParseArgs(argc, argv);

#if ADIOS2_USE_MPI
int provided;
int thread_support_level = (engine == "SST" || engine == "sst")
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE;

// MPI_THREAD_MULTIPLE is only required if you enable the SST MPI_DP
MPI_Init_thread(nullptr, nullptr, thread_support_level, &provided);

int key;
MPI_Comm_rank(MPI_COMM_WORLD, &key);

const unsigned int color = 2;
MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm);
#endif

result = RUN_ALL_TESTS();

#if ADIOS2_USE_MPI
#ifdef CRAY_MPICH_VERSION
MPI_Barrier(MPI_COMM_WORLD);
#else
MPI_Finalize();
#endif
#endif

return result;
}