Skip to content

Commit

Permalink
Merge pull request #1012 from keichi/insitumpi-scalability
Browse files Browse the repository at this point in the history
Optimized algorithm for exchanging read schedules in InSituMPI
  • Loading branch information
pnorbert committed Nov 27, 2018
2 parents 703441d + af7a0fa commit a97eabe
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 125 deletions.
35 changes: 35 additions & 0 deletions source/adios2/engine/insitumpi/InSituMPIFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,41 @@ int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
return writeRootGlobalRank;
}

std::vector<MPI_Status> CompleteRequests(std::vector<MPI_Request> &requests,
const bool IAmWriter,
const int localRank)
{
std::vector<MPI_Status> statuses(requests.size());

const auto ierr =
MPI_Waitall(requests.size(), requests.data(), statuses.data());

if (ierr == MPI_ERR_IN_STATUS)
{
for (auto i = 0; i < requests.size(); i++)
{
if (statuses[i].MPI_ERROR == MPI_ERR_PENDING)
{
std::cerr << "InSituMPI " << (IAmWriter ? "Writer" : "Reader")
<< " " << localRank
<< " Pending transfer error when waiting for all "
"data transfers to complete. request id = "
<< i << std::endl;
}
else if (statuses[i].MPI_ERROR != MPI_SUCCESS)
{
std::cerr << "InSituMPI " << (IAmWriter ? "Writer" : "Reader")
<< " " << localRank
<< " MPI Error when waiting for all data transfers "
"to complete. Error code = "
<< ierr << std::endl;
}
}
}

return statuses;
}

} // end namespace insitumpi

} // end namespace adios2
6 changes: 6 additions & 0 deletions source/adios2/engine/insitumpi/InSituMPIFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum MpiTags
FixedRemoteSchedule,
MetadataLength,
Metadata,
NumReaderPerWriter,
ReadScheduleLength,
ReadSchedule,
Data,
Expand Down Expand Up @@ -65,6 +66,11 @@ int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
const bool IAmWriterRoot, const int globalRank,
const std::vector<int> &peers);

// Wait for multiple MPI requests to complete and check errors
std::vector<MPI_Status> CompleteRequests(std::vector<MPI_Request> &requests,
const bool IAmWriter,
const int localRank);

} // end namespace insitumpi

} // end namespace adios2
Expand Down
96 changes: 53 additions & 43 deletions source/adios2/engine/insitumpi/InSituMPIReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,13 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
// If some timeouts (and never checks back) and the others succeed,
// the global metadata operation will hang
std::vector<MPI_Request> requests(m_RankDirectPeers.size());
std::vector<MPI_Status> statuses(m_RankDirectPeers.size());
std::vector<int> steps(m_RankDirectPeers.size());
for (int peerID = 0; peerID < m_RankDirectPeers.size(); peerID++)
{
MPI_Irecv(&steps[peerID], 1, MPI_INT, m_RankDirectPeers[peerID],
insitumpi::MpiTags::Step, m_CommWorld, &requests[peerID]);
}
MPI_Waitall(m_RankDirectPeers.size(), requests.data(), statuses.data());
insitumpi::CompleteRequests(requests, false, m_ReaderRank);

if (m_Verbosity == 5)
{
Expand Down Expand Up @@ -333,32 +332,67 @@ void InSituMPIReader::SendReadSchedule(
{
const bool profile = m_BP3Deserializer.m_Profiler.IsActive;

// serialized schedules, one per-writer
std::vector<std::vector<char>> serializedSchedules =
// Serialized schedules, one per-writer
std::map<int, std::vector<char>> serializedSchedules =
insitumpi::SerializeLocalReadSchedule(m_RankAllPeers.size(),
variablesSubFileInfo);

std::vector<MPI_Request> request(m_RankAllPeers.size());
std::vector<int> mdLen(m_RankAllPeers.size());
for (int i = 0; i < m_RankAllPeers.size(); i++)
// Writer ID -> number of peer readers
std::vector<int> nReaderPerWriter(m_RankAllPeers.size());

// Fill nReaderPerWriter for this reader
for (const auto &schedulePair : serializedSchedules)
{
const auto peerID = schedulePair.first;
nReaderPerWriter[peerID] = 1;
}

// Accumulate nReaderPerWriter for all readers
void *sendBuf = nReaderPerWriter.data();
if (m_ReaderRootRank == m_ReaderRank)
{
sendBuf = MPI_IN_PLACE;
}
MPI_Reduce(sendBuf, nReaderPerWriter.data(), nReaderPerWriter.size(),
MPI_INT, MPI_SUM, m_ReaderRootRank, m_MPIComm);

// Reader root sends nReaderPerWriter to writer root
if (m_ReaderRootRank == m_ReaderRank)
{
MPI_Send(nReaderPerWriter.data(), nReaderPerWriter.size(), MPI_INT,
m_WriteRootGlobalRank, insitumpi::MpiTags::NumReaderPerWriter,
m_CommWorld);
}

// *2 because we need two requests per writer (one for sending the length
// of the read schedule and another for the actual read schedule)
std::vector<MPI_Request> request(serializedSchedules.size() * 2);
std::vector<int> rsLengths(serializedSchedules.size());
auto i = 0;

for (const auto &schedulePair : serializedSchedules)
{
mdLen[i] = serializedSchedules[i].size();
const auto peerID = schedulePair.first;
const auto &schedule = schedulePair.second;
rsLengths[i] = schedule.size();

if (m_Verbosity == 5)
{
std::cout << "InSituMPI Reader " << m_ReaderRank
<< " Send Read Schedule len = " << mdLen[i]
<< " to Writer " << i << " global rank "
<< m_RankAllPeers[i] << std::endl;
<< " Send Read Schedule len = " << rsLengths[i]
<< " to Writer " << peerID << " global rank "
<< m_RankAllPeers[peerID] << std::endl;
}
MPI_Isend(&(mdLen[i]), 1, MPI_INT, m_RankAllPeers[i],
MPI_Isend(&rsLengths[i], 1, MPI_INT, m_RankAllPeers[peerID],
insitumpi::MpiTags::ReadScheduleLength, m_CommWorld,
&(request[i]));
MPI_Isend(serializedSchedules[i].data(), mdLen[i], MPI_CHAR,
m_RankAllPeers[i], insitumpi::MpiTags::ReadSchedule,
m_CommWorld, &(request[i]));
&request[i * 2]);
MPI_Isend(schedule.data(), rsLengths[i], MPI_CHAR,
m_RankAllPeers[peerID], insitumpi::MpiTags::ReadSchedule,
m_CommWorld, &request[i * 2 + 1]);

i++;
}
std::vector<MPI_Status> status(m_RankAllPeers.size());
MPI_Waitall(m_RankAllPeers.size(), request.data(), status.data());
insitumpi::CompleteRequests(request, false, m_ReaderRank);
}

void InSituMPIReader::AsyncRecvAllVariables()
Expand Down Expand Up @@ -395,32 +429,8 @@ void InSituMPIReader::AsyncRecvAllVariables()
void InSituMPIReader::ProcessReceives()
{
const int nRequests = m_OngoingReceives.size();
std::vector<MPI_Status> statuses(nRequests);
int ierr;

// Wait for all transfers to complete
ierr = MPI_Waitall(nRequests, m_MPIRequests.data(), statuses.data());

if (ierr == MPI_ERR_IN_STATUS)
{
for (int i = 0; i < nRequests; i++)
{
if (statuses[i].MPI_ERROR == MPI_ERR_PENDING)
{
std::cerr << "InSituMPI Reader " << m_ReaderRank
<< " Pending transfer error when waiting for all "
"data transfers to complete. Receive index = "
<< i << std::endl;
}
else if (statuses[i].MPI_ERROR != MPI_SUCCESS)
{
std::cerr << "InSituMPI Reader " << m_ReaderRank
<< " MPI Error when waiting for all data transfers "
"to complete. Error code = "
<< ierr << std::endl;
}
}
}
insitumpi::CompleteRequests(m_MPIRequests, false, m_ReaderRank);

// Send final acknowledgment to the Writer
int dummy = 1;
Expand Down
42 changes: 23 additions & 19 deletions source/adios2/engine/insitumpi/InSituMPISchedules.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,13 @@ void FixSeeksToZeroOffset(helper::SubFileInfo &record, bool isRowMajor) noexcept
record.Seeks.second = pos + nElements - 1;
}

std::vector<std::vector<char>>
std::map<int, std::vector<char>>
SerializeLocalReadSchedule(const int nWriters,
const std::map<std::string, helper::SubFileInfoMap>
&variablesSubFileInfo) noexcept
{
std::vector<std::vector<char>> buffers(nWriters);

// Create a buffer for each writer
std::vector<int> nVarPerWriter(nWriters);
for (size_t i = 0; i < nWriters; i++)
{
nVarPerWriter[i] = 0;
// allocate first 4 bytes
helper::InsertToBuffer(buffers[i], &nVarPerWriter[i], 1);
}
std::map<int, std::vector<char>> buffers;
std::map<int, int> nVarPerWriter;

for (const auto &variableNamePair : variablesSubFileInfo)
{
Expand All @@ -103,10 +95,17 @@ SerializeLocalReadSchedule(const int nWriters,
for (const auto &subFileIndexPair : variableNamePair.second)
{
const size_t subFileIndex = subFileIndexPair.first; // writer
auto &lrs = buffers[subFileIndex];
// <steps, <SubFileInfo>> but there is only one step
for (const auto &stepPair : subFileIndexPair.second)
{
if (buffers.find(subFileIndex) == buffers.end())
{
nVarPerWriter[subFileIndex] = 0;
// allocate first 4 bytes (number of requested variables)
helper::InsertToBuffer(buffers[subFileIndex],
&nVarPerWriter[subFileIndex], 1);
}

// LocalReadSchedule sfi = subFileIndexPair.second[0];
const std::vector<helper::SubFileInfo> &sfi = stepPair.second;
SerializeLocalReadSchedule(buffers[subFileIndex], variableName,
Expand All @@ -118,10 +117,13 @@ SerializeLocalReadSchedule(const int nWriters,
}

// Record the number of actually requested variables for each buffer
for (int i = 0; i < nWriters; i++)
for (auto &bufferPair : buffers)
{
size_t pos = 0;
helper::CopyToBuffer(buffers[i], pos, &nVarPerWriter[i]);
const auto peerID = bufferPair.first;
auto &buffer = bufferPair.second;

helper::CopyToBuffer(buffer, pos, &nVarPerWriter[peerID]);
}
return buffers;
}
Expand Down Expand Up @@ -182,18 +184,20 @@ int GetNumberOfRequestsInWriteScheduleMap(WriteScheduleMap &map) noexcept
return n;
}

WriteScheduleMap
DeserializeReadSchedule(const std::vector<std::vector<char>> &buffers) noexcept
WriteScheduleMap DeserializeReadSchedule(
const std::map<int, std::vector<char>> &buffers) noexcept
{
WriteScheduleMap map;

for (int i = 0; i < buffers.size(); i++)
for (const auto &bufferPair : buffers)
{
const auto &buffer = buffers[i];
const auto peerID = bufferPair.first;
const auto &buffer = bufferPair.second;

LocalReadScheduleMap lrsm = DeserializeReadSchedule(buffer);
for (const auto &varSchedule : lrsm)
{
map[varSchedule.first][i] = varSchedule.second;
map[varSchedule.first][peerID] = varSchedule.second;
}
}
return map;
Expand Down
6 changes: 3 additions & 3 deletions source/adios2/engine/insitumpi/InSituMPISchedules.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void FixSeeksToZeroOffset(helper::SubFileInfo &record,
// for each rank separately have one buffer[rank]
// there is one step, lrs is the vector of SubFileInfos
// SerializeLocalReadSchedule (variable, lrs)
std::vector<std::vector<char>>
std::map<int, std::vector<char>>
SerializeLocalReadSchedule(const int nWriters,
const std::map<std::string, helper::SubFileInfoMap>
&variablesSubFileInfo) noexcept;
Expand Down Expand Up @@ -87,8 +87,8 @@ using LocalReadScheduleMap =
int GetNumberOfRequestsInWriteScheduleMap(WriteScheduleMap &map) noexcept;

// Deserialize buffers from all readers
WriteScheduleMap
DeserializeReadSchedule(const std::vector<std::vector<char>> &buffers) noexcept;
WriteScheduleMap DeserializeReadSchedule(
const std::map<int, std::vector<char>> &buffers) noexcept;

// Deserialize buffer from one reader
LocalReadScheduleMap
Expand Down

0 comments on commit a97eabe

Please sign in to comment.