Skip to content

Commit

Permalink
Fix race condition in OnDemand timestep distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Sep 29, 2022
1 parent 91207a2 commit 9d55dad
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 13 deletions.
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_common.c
Expand Up @@ -1526,6 +1526,7 @@ SstStream CP_newStream()
pthread_cond_init(&Stream->DataCondition, NULL);
Stream->WriterTimestep = -1; // Filled in by ProvideTimestep
Stream->ReaderTimestep = -1; // first beginstep will get us timestep 0
Stream->CloseTimestepCount = (size_t)-1;
Stream->LastReleasedTimestep = -1;
Stream->DiscardPriorTimestep =
-1; // Timesteps prior to this discarded/released upon arrival
Expand Down
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Expand Up @@ -181,6 +181,7 @@ struct _SstStream
int WriterDefinitionsLocked;
size_t NextRRDistribution;
size_t LastDemandTimestep;
size_t CloseTimestepCount;

/* rendezvous condition */
int FirstReaderCondition;
Expand Down
48 changes: 36 additions & 12 deletions source/adios2/toolkit/sst/cp/cp_writer.c
Expand Up @@ -1180,6 +1180,8 @@ static void SendTimestepEntryToSingleReader(SstStream Stream,
}
}

static void SendCloseMsgs(SstStream Stream);

static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry)
{
STREAM_ASSERT_LOCKED(Stream);
Expand Down Expand Up @@ -1224,10 +1226,15 @@ static void SendTimestepEntryToReaders(SstStream Stream, CPTimestepList Entry)
free(Request);
if (Stream->Readers[RequestingReader]->ReaderStatus == Established)
{
Stream->LastDemandTimestep = Entry->Timestep;
SendTimestepEntryToSingleReader(
Stream, Entry, Stream->Readers[RequestingReader],
RequestingReader);
Stream->LastDemandTimestep = Entry->Timestep;
if (Stream->LastDemandTimestep == Stream->CloseTimestepCount)
{
/* send if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
}
else
{
Expand Down Expand Up @@ -1554,6 +1561,21 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream,
QueueMaintenance(ParentStream);
}

static void SendCloseMsgs(SstStream Stream)
{
struct _WriterCloseMsg Msg;
STREAM_ASSERT_LOCKED(Stream);
memset(&Msg, 0, sizeof(Msg));
Msg.FinalTimestep = Stream->LastProvidedTimestep;
CP_verbose(
Stream, PerStepVerbose,
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);
}

/*
On writer close:
RemovePreciousTag on any timestep in queue
Expand All @@ -1569,19 +1591,15 @@ On writer close:
*/
void SstWriterClose(SstStream Stream)
{
struct _WriterCloseMsg Msg;
struct timeval CloseTime, Diff;
memset(&Msg, 0, sizeof(Msg));
Stream->CloseTimestepCount = Stream->WriterTimestep;
STREAM_MUTEX_LOCK(Stream);
Msg.FinalTimestep = Stream->LastProvidedTimestep;
CP_verbose(
Stream, PerStepVerbose,
"SstWriterClose, Sending Close at Timestep %d, one to each reader\n",
Msg.FinalTimestep);

sendOneToEachReaderRank(Stream, Stream->CPInfo->SharedCM->WriterCloseFormat,
&Msg, &Msg.RS_Stream);

if ((Stream->ConfigParams->StepDistributionMode != StepsOnDemand) ||
(Stream->LastDemandTimestep == Stream->CloseTimestepCount))
{
/* send if not OnDemand, or if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
UntagPreciousTimesteps(Stream);
Stream->ConfigParams->ReserveQueueLimit = 0;
QueueMaintenance(Stream);
Expand Down Expand Up @@ -2659,8 +2677,14 @@ void CP_ReaderRequestStepHandler(CManager cm, CMConnection conn, void *Msg_v,
"reference count = %d\n",
NextTS, List->ReferenceCount);

Stream->LastDemandTimestep = List->Timestep;
SendTimestepEntryToSingleReader(Stream, List, CP_WSR_Stream,
RequestingReader);
if (Stream->LastDemandTimestep == Stream->CloseTimestepCount)
{
/* send if all timesteps have been send OnDemand */
SendCloseMsgs(Stream);
}
STREAM_MUTEX_UNLOCK(CP_WSR_Stream->ParentStream);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion testing/adios2/engine/staging-common/CMakeLists.txt
Expand Up @@ -36,6 +36,8 @@ foreach(helper
TestLocalRead
TestStructWrite
TestStructRead
TestOnDemandWrite
TestOnDemandRead
)
add_executable(${helper} ${helper}.cpp)

Expand Down Expand Up @@ -144,7 +146,7 @@ set (ALL_SIMPLE_TESTS "")
list (APPEND ALL_SIMPLE_TESTS ${SIMPLE_TESTS} ${SIMPLE_FORTRAN_TESTS} ${SIMPLE_MPI_TESTS} ${SIMPLE_ZFP_TESTS})

set (SST_SPECIFIC_TESTS "")
list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3")
list (APPEND SST_SPECIFIC_TESTS "1x1.SstRUDP;1x1.LocalMultiblock;RoundRobinDistribution.1x1x3;AllToAllDistribution.1x1x3;OnDemandSingle.1x1")
if (ADIOS2_HAVE_MPI)
list (APPEND SST_SPECIFIC_TESTS "2x3.SstRUDP;2x1.LocalMultiblock;5x3.LocalMultiblock;")
endif()
Expand Down
169 changes: 169 additions & 0 deletions testing/adios2/engine/staging-common/TestOnDemandRead.cpp
@@ -0,0 +1,169 @@
#include <chrono>
#include <iostream>
#include <numeric>
#include <thread>
#include <vector>

#include <adios2.h>
#include <gtest/gtest.h>

#include "TestData.h"

#include "ParseArgs.h"

class SstOnDemandReadTest : public ::testing::Test
{
public:
SstOnDemandReadTest() = default;
};

#if ADIOS2_USE_MPI
MPI_Comm testComm;
#endif

// ADIOS2 Sst read
TEST_F(SstOnDemandReadTest, ADIOS2SstOnDemandRead)
{
int rank = 0;
int size = 1;

int variablesSize = 100;

#if ADIOS2_USE_MPI
MPI_Comm_rank(testComm, &rank);
MPI_Comm_size(testComm, &size);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(testComm);
#else
adios2::ADIOS adios;
#endif
try
{
adios2::IO sstIO = adios.DeclareIO("sstOnDemand");

sstIO.SetEngine(engine);
adios2::Engine sstReader = sstIO.Open(fname, adios2::Mode::Read);

double get_time = 0;
const std::size_t my_start = Nx * rank;
const adios2::Dims pos_start{my_start};
const adios2::Dims count{Nx};
const adios2::Box<adios2::Dims> sel(pos_start, count);

auto start_step = std::chrono::steady_clock::now();
int steps = 0;
std::vector<float> myFloats(variablesSize * Nx);
while (sstReader.BeginStep() == adios2::StepStatus::OK)
{
for (int v = 0; v < variablesSize; ++v)
{
std::string namev("sstFloats");
namev += std::to_string(v);
adios2::Variable<float> sstFloats =
sstIO.InquireVariable<float>(namev);

sstFloats.SetSelection(sel);
auto start_get = std::chrono::steady_clock::now();
sstReader.Get(sstFloats, myFloats.data() + (v * Nx));
auto end_get = std::chrono::steady_clock::now();
get_time += (end_get - start_get).count() / 1000;
// std::this_thread::sleep_for (std::chrono::seconds(1));
}
sstReader.EndStep();
steps += 1;
#ifdef DEBUG
size_t currentStep = sstReader.CurrentStep();
for (unsigned int v = 0; v < variablesSize; ++v)
{
std::cout << name << ": Get step " << currentStep << " variable"
<< v << " " << myFloats[v * Nx] << std::endl;
}
#endif
}
auto end_step = std::chrono::steady_clock::now();
double total_time =
((double)(end_step - start_step).count()) / (size * 1000.0);
get_time /= size;

double global_get_sum = 0;
double global_sum = 0;
#if ADIOS2_USE_MPI
MPI_Reduce(&get_time, &global_get_sum, 1, MPI_DOUBLE, MPI_SUM, 0,
testComm);
MPI_Reduce(&total_time, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0,
testComm);
#else
global_sum = total_time;
global_get_sum = get_time;
#endif

// Time in microseconds
if (rank == 0)
{
std::cout << "SST,Read," << size << "," << Nx << ","
<< variablesSize << "," << steps << "," << global_get_sum
<< "," << global_sum << std::endl;
}
EXPECT_EQ(NSteps, steps);
sstReader.Close();
}
catch (std::invalid_argument &e)
{
std::cout << "Invalid argument exception, STOPPING PROGRAM from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::ios_base::failure &e)
{
std::cout << "IO System base failure exception, STOPPING PROGRAM "
"from rank "
<< rank << "\n";
std::cout << e.what() << "\n";
}
catch (std::exception &e)
{
std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n";
std::cout << e.what() << "\n";
}
}

//******************************************************************************
// main
//******************************************************************************

int main(int argc, char **argv)
{
int result;
::testing::InitGoogleTest(&argc, argv);
ParseArgs(argc, argv);

#if ADIOS2_USE_MPI
int provided;
int thread_support_level = (engine == "SST" || engine == "sst")
? MPI_THREAD_MULTIPLE
: MPI_THREAD_SINGLE;

// MPI_THREAD_MULTIPLE is only required if you enable the SST MPI_DP
MPI_Init_thread(nullptr, nullptr, thread_support_level, &provided);

int key;
MPI_Comm_rank(MPI_COMM_WORLD, &key);

const unsigned int color = 2;
MPI_Comm_split(MPI_COMM_WORLD, color, key, &testComm);
#endif

result = RUN_ALL_TESTS();

#if ADIOS2_USE_MPI
#ifdef CRAY_MPICH_VERSION
MPI_Barrier(MPI_COMM_WORLD);
#else
MPI_Finalize();
#endif
#endif

return result;
}

0 comments on commit 9d55dad

Please sign in to comment.