Skip to content

Commit

Permalink
Merge pull request #1011 from shawnyang610/ndcopy
Browse files Browse the repository at this point in the history
adding memStart, memCount into NdCopy, fixing rev-major function
  • Loading branch information
JasonRuonanWang committed Nov 26, 2018
2 parents 6cf0e2c + 57ea4fd commit 3a2df8b
Show file tree
Hide file tree
Showing 20 changed files with 908 additions and 77 deletions.
2 changes: 2 additions & 0 deletions source/adios2/core/Engine.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ void Engine::CommonChecks(Variable<T> &variable, const T *data,
data, "for data argument in non-zero count block, " + hint);
}

/* Commented out by Jason for allowing SetMemorySelection on reader
if (!variable.m_MemoryStart.empty() && m_OpenMode == adios2::Mode::Read)
{
throw std::invalid_argument("ERROR: from Engine " + m_Name +
Expand All @@ -194,6 +195,7 @@ void Engine::CommonChecks(Variable<T> &variable, const T *data,
"mode, " +
hint + "\n");
}
*/
}

} // end namespace core
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/dataman/DataManCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace engine
DataManCommon::DataManCommon(const std::string engineType, IO &io,
const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine(engineType, io, name, mode, mpiComm)
: Engine(engineType, io, name, mode, mpiComm),
m_FileTransport(mpiComm, m_DebugMode)
{

// initialize parameters
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/engine/dataman/DataManCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "adios2/ADIOSConfig.h"
#include "adios2/ADIOSMacros.h"
#include "adios2/core/Engine.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include "adios2/toolkit/transportman/dataman/DataMan.h"

namespace adios2
Expand Down Expand Up @@ -44,6 +45,9 @@ class DataManCommon : public Engine

bool m_IsLittleEndian;
bool m_IsRowMajor;
bool m_ContiguousMajor = false;

transport::FileFStream m_FileTransport;

std::vector<std::string> m_StreamNames;
std::vector<core::Operator *> m_Callbacks;
Expand Down
8 changes: 4 additions & 4 deletions source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace engine
DataManReader::DataManReader(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: DataManCommon("DataManReader", io, name, mode, mpiComm),
m_DataManDeserializer(m_IsRowMajor, m_IsLittleEndian)
m_DataManDeserializer(m_IsRowMajor, m_ContiguousMajor, m_IsLittleEndian)
{
m_EndMessage = " in call to IO Open DataManReader " + m_Name + "\n";
Init();
Expand Down Expand Up @@ -150,10 +150,10 @@ void DataManReader::Flush(const int transportIndex) {}

void DataManReader::Init()
{
if (m_WorkflowMode != "subscribe" && m_WorkflowMode != "p2p")
if (m_WorkflowMode == "file")
{
throw(std::invalid_argument(
"[DataManReader::Init] invalid workflow mode " + m_WorkflowMode));
m_FileTransport.Open(m_Name, Mode::Read);
return;
}

// initialize transports
Expand Down
33 changes: 27 additions & 6 deletions source/adios2/engine/dataman/DataManReader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,41 @@ void DataManReader::GetSyncCommon(Variable<T> &variable, T *data)
template <class T>
void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
if (m_WorkflowMode == "subscribe")
if (m_IsRowMajor)
{
while (m_DataManDeserializer.Get(data, variable.m_Name,
variable.m_Start, variable.m_Count,
m_CurrentStep) != 0)
m_CurrentStep, variable.m_MemoryStart,
variable.m_MemoryCount) != 0)
{
}
}
else if (m_WorkflowMode == "p2p")
else
{
while (m_DataManDeserializer.Get(data, variable.m_Name,
variable.m_Start, variable.m_Count,
m_CurrentStep) != 0)
if (m_ContiguousMajor)
{
Dims start = variable.m_Start;
Dims count = variable.m_Count;
Dims memstart = variable.m_MemoryStart;
Dims memcount = variable.m_MemoryCount;
std::reverse(start.begin(), start.end());
std::reverse(count.begin(), count.end());
std::reverse(memstart.begin(), memstart.end());
std::reverse(memcount.begin(), memcount.end());
while (m_DataManDeserializer.Get(data, variable.m_Name, start,
count, m_CurrentStep, memstart,
memcount) != 0)
{
}
}
else
{
while (m_DataManDeserializer.Get(
data, variable.m_Name, variable.m_Start,
variable.m_Count, m_CurrentStep, variable.m_MemoryStart,
variable.m_MemoryCount) != 0)
{
}
}
}
}
Expand Down
25 changes: 23 additions & 2 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ void DataManWriter::PerformPuts() {}

void DataManWriter::EndStep()
{

if (m_WorkflowMode == "file")
{
const std::shared_ptr<std::vector<char>> buf =
m_DataManSerializer[0]->Get();
m_FileTransport.Write(buf->data(), buf->size());
return;
}

if (m_Format == "dataman")
{
for (size_t i = 0; i < m_TransportChannels; ++i)
Expand Down Expand Up @@ -89,6 +98,12 @@ void DataManWriter::Flush(const int transportIndex) {}
void DataManWriter::Init()
{

if (m_WorkflowMode == "file")
{
m_FileTransport.Open(m_Name, Mode::Write);
return;
}

// initialize transports
m_DataMan = std::make_shared<transportman::DataMan>(m_MPIComm, m_DebugMode);
m_DataMan->OpenWANTransports(m_StreamNames, m_IO.m_TransportsParameters,
Expand All @@ -100,8 +115,8 @@ void DataManWriter::Init()
for (size_t i = 0; i < m_TransportChannels; ++i)
{
m_DataManSerializer.push_back(
std::make_shared<format::DataManSerializer>(m_IsRowMajor,
m_IsLittleEndian));
std::make_shared<format::DataManSerializer>(
m_IsRowMajor, m_ContiguousMajor, m_IsLittleEndian));
}
}
else if (m_Format == "binary")
Expand Down Expand Up @@ -130,6 +145,12 @@ ADIOS2_FOREACH_TYPE_1ARG(declare_type)

void DataManWriter::DoClose(const int transportIndex)
{
if (m_WorkflowMode == "file")
{
m_FileTransport.Close();
return;
}

if (m_Format == "dataman")
{
m_DataMan->WriteWAN(format::DataManSerializer::EndSignal(CurrentStep()),
Expand Down
43 changes: 29 additions & 14 deletions source/adios2/engine/dataman/DataManWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,47 @@ void DataManWriter::PutDeferredCommon(Variable<T> &variable, const T *values)
variable.m_Start.assign(variable.m_Count.size(), 0);
}

if (m_Format == "dataman")
if (m_IsRowMajor)
{
for (size_t i = 0; i < m_TransportChannels; ++i)
{
if (m_WorkflowMode == "subscribe")
m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
}
}
else
{
if (m_ContiguousMajor)
{
Dims start = variable.m_Start;
Dims count = variable.m_Count;
Dims shape = variable.m_Shape;
Dims memstart = variable.m_MemoryStart;
Dims memcount = variable.m_MemoryCount;
std::reverse(start.begin(), start.end());
std::reverse(count.begin(), count.end());
std::reverse(shape.begin(), shape.end());
std::reverse(memstart.begin(), memstart.end());
std::reverse(memcount.begin(), memcount.end());
for (size_t i = 0; i < m_TransportChannels; ++i)
{
m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
m_DataManSerializer[i]->Put(
variable.m_Data, variable.m_Name, shape, start, count,
memstart, memcount, m_Name, CurrentStep(), m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
}
else
}
else
{
for (size_t i = 0; i < m_TransportChannels; ++i)
{
m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
}
}
}
else if (m_Format == "binary")
{
}
else
{
throw(std::invalid_argument(
"[DataManWriter::PutSyncCommon] invalid format " + m_Format));
}
}

} // end namespace engine
Expand Down
13 changes: 10 additions & 3 deletions source/adios2/helper/adiosMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,27 @@ void Resize(std::vector<T> &vec, const size_t dataSize, const bool debugMode,
* @param inIsRowMaj specifies major for input
* @param inIsBigEndian specifies endianess for input
* @param out pointer to destination memory buffer
* @param outStart source data starting offset
* @param outCount destination data structure
* @param outStart destination request data starting offset
* @param outCount destination request data structure
* @param outIsRowMaj specifies major for output
* @param outIsBigEndian specifies endianess for output
* @param inMemStart source memory starting offset
* @param inMemCount source memory structure
* @param outMemStart destination request data starting offset
* @param outMemCount destination request data structure
* @param safeMode false:runs faster, the number of function stacks
* used by recursive algm is equal to the number of dimensions.
* true: runs a bit slower, same algorithm using the explicit
* stack/simulated stack which has more overhead for the algm.
*/

template <class T>
int NdCopy(const char *in, const Dims &inStart, const Dims &inCount,
const bool inIsRowMajor, const bool inIsLittleEndian, char *out,
const Dims &outStart, const Dims &outCount, const bool outIsRowMajor,
const bool outIsLittleEndian, const bool safeMode = false);
const bool outIsLittleEndian, const Dims &inMemStart = Dims(),
const Dims &inMemCount = Dims(), const Dims &outMemStart = Dims(),
const Dims &outMemCount = Dims(), const bool safeMode = false);

template <class T>
size_t PayloadSize(const T *data, const Dims &count) noexcept;
Expand Down

0 comments on commit 3a2df8b

Please sign in to comment.