Skip to content

Commit

Permalink
Merge pull request #3313 from JasonRuonanWang/ssc
Browse files Browse the repository at this point in the history
Enable memory selection in ssc reader
  • Loading branch information
JasonRuonanWang committed Aug 15, 2022
2 parents b3c3fa2 + 597e3c7 commit c91d852
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 13 deletions.
8 changes: 6 additions & 2 deletions source/adios2/engine/ssc/SscHelper.cpp
Expand Up @@ -299,7 +299,7 @@ void DeserializeVariable(const Buffer &input, const ShapeID shapeId,
else
{
helper::Throw<std::runtime_error>("Engine", "SscHelper",
"Deserialize",
"DeserializeVariable",
"unknown variable data type");
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ void DeserializeAttribute(const Buffer &input, uint64_t &pos, IO &io,
else
{
helper::Throw<std::runtime_error>(
"Engine", "SscHelper", "Deserialize",
"Engine", "SscHelper", "DeserializeAttribute",
"unknown attribute data type");
}
}
Expand All @@ -428,6 +428,10 @@ void SerializeStructDefinitions(
const std::unordered_map<std::string, StructDefinition> &definitions,
Buffer &output)
{
if (definitions.empty())
{
return;
}
uint64_t pos = output.value<uint64_t>();
output[pos] = 65;
++pos;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Expand Up @@ -119,6 +119,8 @@ struct BlockInfo
Dims shape;
Dims start;
Dims count;
Dims memStart;
Dims memCount;
size_t bufferStart;
size_t bufferCount;
std::vector<char> value;
Expand Down
32 changes: 24 additions & 8 deletions source/adios2/engine/ssc/SscReaderGeneric.cpp
Expand Up @@ -299,7 +299,11 @@ void SscReaderGeneric::PerformGets()
reinterpret_cast<char *>(br.data),
helper::CoreDims(br.start),
helper::CoreDims(br.count), true, true,
static_cast<int>(b.elementSize));
static_cast<int>(b.elementSize),
helper::CoreDims(b.start),
helper::CoreDims(b.count),
helper::CoreDims(br.memStart),
helper::CoreDims(br.memCount));
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down Expand Up @@ -442,12 +446,16 @@ void SscReaderGeneric::GetDeferredDeltaCommon(VariableBase &variable,
Dims vStart = variable.m_Start;
Dims vCount = variable.m_Count;
Dims vShape = variable.m_Shape;
Dims vMemStart = variable.m_MemoryStart;
Dims vMemCount = variable.m_MemoryCount;

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

m_LocalReadPattern.emplace_back();
Expand All @@ -459,6 +467,8 @@ void SscReaderGeneric::GetDeferredDeltaCommon(VariableBase &variable,
b.start = vStart;
b.count = vCount;
b.shape = vShape;
b.memStart = vMemStart;
b.memCount = vMemCount;
b.bufferStart = 0;
b.bufferCount = 0;
b.data = data;
Expand Down Expand Up @@ -507,12 +517,16 @@ void SscReaderGeneric::GetDeferred(VariableBase &variable, void *data)
helper::DimsArray vStart(variable.m_Start);
helper::DimsArray vCount(variable.m_Count);
helper::DimsArray vShape(variable.m_Shape);
helper::DimsArray vMemStart(variable.m_MemoryStart);
helper::DimsArray vMemCount(variable.m_MemoryCount);

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
Expand Down Expand Up @@ -546,13 +560,15 @@ void SscReaderGeneric::GetDeferred(VariableBase &variable, void *data)
if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy(
m_Buffer.data<char>() + b.bufferStart,
helper::CoreDims(b.start),
helper::CoreDims(b.count), true, true,
reinterpret_cast<char *>(data), vStart, vCount,
true, true,
static_cast<int>(variable.m_ElementSize));
helper::NdCopy(m_Buffer.data<char>() + b.bufferStart,
helper::CoreDims(b.start),
helper::CoreDims(b.count), true, true,
reinterpret_cast<char *>(data), vStart,
vCount, true, true,
static_cast<int>(variable.m_ElementSize),
helper::CoreDims(b.start),
helper::CoreDims(b.count), vMemStart,
vMemCount);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down
10 changes: 7 additions & 3 deletions source/adios2/engine/ssc/SscReaderNaive.cpp
Expand Up @@ -182,13 +182,15 @@ void SscReaderNaive::GetDeferred(VariableBase &variable, void *data)

helper::DimsArray vStart(variable.m_Start);
helper::DimsArray vCount(variable.m_Count);
// Dims vShape = variable.m_Shape;
helper::DimsArray vMemStart(variable.m_MemoryStart);
helper::DimsArray vMemCount(variable.m_MemoryCount);

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
// std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

for (const auto &b : m_BlockMap[variable.m_Name])
Expand All @@ -200,7 +202,9 @@ void SscReaderNaive::GetDeferred(VariableBase &variable, void *data)
helper::CoreDims(b.start), helper::CoreDims(b.count),
true, true, reinterpret_cast<char *>(data), vStart,
vCount, true, true,
static_cast<int>(variable.m_ElementSize));
static_cast<int>(variable.m_ElementSize),
helper::CoreDims(b.start), helper::CoreDims(b.count),
vMemStart, vMemCount);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/engine/ssc/CMakeLists.txt
Expand Up @@ -9,6 +9,9 @@ if(ADIOS2_HAVE_MPI AND NOT MSVC)
gtest_add_tests_helper(Base MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscBase.MPI "" TRUE)

gtest_add_tests_helper(3DMemSelect MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSsc3DMemSelect.MPI "" TRUE)

gtest_add_tests_helper(Struct MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscStruct.MPI "" TRUE)

Expand Down
205 changes: 205 additions & 0 deletions testing/adios2/engine/ssc/TestSsc3DMemSelect.cpp
@@ -0,0 +1,205 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* TestSsc3DMemSelect.cpp
*
* Created on: Aug 12, 2022
* Author: Jason Wang
*/

#include <adios2.h>
#include <gtest/gtest.h>
#include <numeric>
#include <thread>

using namespace adios2;
int mpiRank = 0;
int mpiSize = 1;
MPI_Comm mpiComm;

size_t steps = 100;

Dims shape = {4, 4, 4};
std::vector<int> global_data = {0, 1, 2, 3, 10, 11, 12, 13,
20, 21, 22, 23, 30, 31, 32, 33,

100, 101, 102, 103, 110, 111, 112, 113,
120, 121, 122, 123, 130, 131, 132, 133,

200, 201, 202, 203, 210, 211, 212, 213,
220, 221, 222, 223, 230, 231, 232, 233,

300, 301, 302, 303, 310, 311, 312, 313,
320, 321, 322, 323, 330, 331, 332, 333};

Dims start = {1, 2, 1};
Dims count = {2, 1, 2};
std::vector<int> writer_data = {121, 122, 221, 222};

Dims memStart = {0, 1, 1};
Dims memCount = {3, 3, 3};
std::vector<int> reader_data = {11, 12, 13, 21, 22, 23, 31, 32, 33,

111, 112, 113, 121, 122, 123, 131, 132, 133,

211, 212, 213, 221, 222, 223, 231, 232, 233};

class SscEngineTest : public ::testing::Test
{
public:
SscEngineTest() = default;
};

template <class T>
void PrintData(const T *data, const size_t step, const Dims &start,
const Dims &count)
{
size_t size = std::accumulate(count.begin(), count.end(), 1,
std::multiplies<size_t>());
std::cout << "Step: " << step << " Size:" << size << "\n";
size_t printsize = 128;

if (size < printsize)
{
printsize = size;
}
int s = 0;
for (size_t i = 0; i < printsize; ++i)
{
++s;
std::cout << data[i] << " ";
if (s == count[1])
{
std::cout << std::endl;
s = 0;
}
}

std::cout << "]" << std::endl;
}

void VerifyData(const int *data, size_t step, const Dims &start,
const Dims &count, const Dims &shape)
{
size_t size = std::accumulate(count.begin(), count.end(), 1,
std::multiplies<size_t>());
bool compressed = false;
for (size_t i = 0; i < size; ++i)
{
if (!compressed)
{
ASSERT_EQ(data[i], reader_data[i]);
}
}
}

void Writer(const Params &engineParams)
{
adios2::ADIOS adios(mpiComm);
adios2::IO io = adios.DeclareIO("io");
io.SetEngine("ssc");
io.SetParameters(engineParams);
auto varInts = io.DefineVariable<int>("varInts", shape, start, count);
adios2::Engine engine = io.Open("stream", adios2::Mode::Write);
for (size_t i = 0; i < steps; ++i)
{
engine.BeginStep();
engine.Put(varInts, writer_data.data(), adios2::Mode::Sync);
engine.EndStep();
}
engine.Close();
}

void Reader(const Params &engineParams)
{
adios2::ADIOS adios(mpiComm);
adios2::IO io = adios.DeclareIO("io");
io.SetEngine("ssc");
io.SetParameters(engineParams);
adios2::Engine engine = io.Open("stream", adios2::Mode::Read);
std::vector<int> myInts = reader_data;
while (true)
{
adios2::StepStatus status = engine.BeginStep(StepMode::Read, 5);
if (status == adios2::StepStatus::OK)
{
const auto &vars = io.AvailableVariables();
ASSERT_EQ(vars.size(), 1);
size_t currentStep = engine.CurrentStep();
adios2::Variable<int> varInts = io.InquireVariable<int>("varInts");
varInts.SetSelection({start, count});
varInts.SetMemorySelection({memStart, memCount});
engine.Get(varInts, myInts.data(), adios2::Mode::Sync);
VerifyData(myInts.data(), currentStep, memStart, memCount, shape);
engine.EndStep();
}
else if (status == adios2::StepStatus::EndOfStream)
{
break;
}
else if (status == adios2::StepStatus::NotReady)
{
continue;
}
}
engine.Close();
}

TEST_F(SscEngineTest, TestSsc3DMemSelect)
{
{
adios2::Params engineParams = {{"Verbose", "0"}};
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
int mpiGroup = worldRank / (worldSize / 2);
MPI_Comm_split(MPI_COMM_WORLD, mpiGroup, worldRank, &mpiComm);
MPI_Comm_rank(mpiComm, &mpiRank);
MPI_Comm_size(mpiComm, &mpiSize);
if (mpiGroup == 0)
{
Writer(engineParams);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (mpiGroup == 1)
{
Reader(engineParams);
}
MPI_Barrier(MPI_COMM_WORLD);
}
{
adios2::Params engineParams = {{"Verbose", "0"},
{"EngineMode", "naive"}};
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
int mpiGroup = worldRank / (worldSize / 2);
MPI_Comm_split(MPI_COMM_WORLD, mpiGroup, worldRank, &mpiComm);
MPI_Comm_rank(mpiComm, &mpiRank);
MPI_Comm_size(mpiComm, &mpiSize);
if (mpiGroup == 0)
{
Writer(engineParams);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (mpiGroup == 1)
{
Reader(engineParams);
}
MPI_Barrier(MPI_COMM_WORLD);
}
}

int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();

MPI_Finalize();
return result;
}

0 comments on commit c91d852

Please sign in to comment.