Skip to content

Commit

Permalink
Merge pull request #982 from JasonRuonanWang/dataman-query
Browse files Browse the repository at this point in the history
refined dataman metadata as preparation for the new staging engine
  • Loading branch information
JasonRuonanWang committed Nov 2, 2018
2 parents dadff59 + 2ddb52e commit 128736b
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 228 deletions.
12 changes: 6 additions & 6 deletions source/adios2/engine/dataman/DataManWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ void DataManWriter::PutDeferredCommon(Variable<T> &variable, const T *values)
{
if (m_WorkflowMode == "subscribe")
{
m_DataManSerializer[i]->Put(
variable, m_Name, CurrentStep(), m_MPIRank,
m_IO.m_TransportsParameters[i], false);
m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
}
else
{
m_DataManSerializer[i]->Put(
variable, m_Name, CurrentStep(), m_MPIRank,
m_IO.m_TransportsParameters[i], true);
m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
m_MPIRank, "",
m_IO.m_TransportsParameters[i]);
}
}
}
Expand Down
205 changes: 81 additions & 124 deletions source/adios2/toolkit/format/dataman/DataManDeserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,145 +50,102 @@ int DataManDeserializer::Put(
key = rand();
}
m_BufferMap[key] = data;
size_t position = 0;
while (position < data->capacity())
{
uint32_t metasize;
std::memcpy(&metasize, data->data() + position, sizeof(metasize));
position += sizeof(metasize);
if (position + metasize > data->size())
{
break;
}
DataManVar var;
try
{
nlohmann::json metaj =
nlohmann::json::from_msgpack(data->data() + position, metasize);
position += metasize;

// compulsory properties
var.name = metaj["N"].get<std::string>();
var.start = metaj["O"].get<Dims>();
var.count = metaj["C"].get<Dims>();
var.step = metaj["T"].get<size_t>();
var.size = metaj["I"].get<size_t>();
uint64_t metaPosition =
(reinterpret_cast<const uint64_t *>(data->data()))[0];
uint64_t metaSize = (reinterpret_cast<const uint64_t *>(data->data()))[1];

// optional properties
nlohmann::json metaJ =
nlohmann::json::from_msgpack(data->data() + metaPosition, metaSize);

auto itMap = m_VarDefaultsMap.find(var.name);
auto itJson = metaj.find("D");
if (itJson != metaj.end())
{
var.doid = itJson->get<std::string>();
m_VarDefaultsMap[var.name].doid = var.doid;
}
else
for (auto stepMapIt = metaJ.begin(); stepMapIt != metaJ.end(); ++stepMapIt)
{
for (auto rankMapIt = stepMapIt.value().begin();
rankMapIt != stepMapIt.value().end(); ++rankMapIt)
{
for (const auto &varBlock : rankMapIt.value())
{
if (itMap != m_VarDefaultsMap.end())
DataManVar var;
try
{
var.doid = itMap->second.doid;
}
}
// compulsory properties
var.name = varBlock["N"].get<std::string>();
var.start = varBlock["O"].get<Dims>();
var.count = varBlock["C"].get<Dims>();
var.step = varBlock["T"].get<size_t>();
var.size = varBlock["I"].get<size_t>();

itJson = metaj.find("M");
if (itJson != metaj.end())
{
var.isRowMajor = itJson->get<bool>();
m_VarDefaultsMap[var.name].isRowMajor = var.isRowMajor;
}
else
{
if (itMap != m_VarDefaultsMap.end())
{
var.isRowMajor = itMap->second.isRowMajor;
}
}
// optional properties

itJson = metaj.find("E");
if (itJson != metaj.end())
{
var.isLittleEndian = itJson->get<bool>();
m_VarDefaultsMap[var.name].isLittleEndian = var.isLittleEndian;
}
else
{
if (itMap != m_VarDefaultsMap.end())
{
var.isLittleEndian = itMap->second.isLittleEndian;
}
}
auto itJson = varBlock.find("D");
if (itJson != varBlock.end())
{
var.doid = itJson->get<std::string>();
}

itJson = metaj.find("Y");
if (itJson != metaj.end())
{
var.type = itJson->get<std::string>();
m_VarDefaultsMap[var.name].type = var.type;
}
else
{
if (itMap != m_VarDefaultsMap.end())
{
var.type = itMap->second.type;
}
}
itJson = varBlock.find("M");
if (itJson != varBlock.end())
{
var.isRowMajor = itJson->get<bool>();
}

itJson = metaj.find("S");
if (itJson != metaj.end())
{
var.shape = itJson->get<Dims>();
m_VarDefaultsMap[var.name].shape = var.shape;
}
else
{
if (itMap != m_VarDefaultsMap.end())
{
var.shape = itMap->second.shape;
}
}
itJson = varBlock.find("E");
if (itJson != varBlock.end())
{
var.isLittleEndian = itJson->get<bool>();
}

var.position = position;
var.index = key;
itJson = varBlock.find("Y");
if (itJson != varBlock.end())
{
var.type = itJson->get<std::string>();
}

auto it = metaj.find("Z");
if (it != metaj.end())
{
var.compression = it->get<std::string>();
}
itJson = varBlock.find("S");
if (itJson != varBlock.end())
{
var.shape = itJson->get<Dims>();
}

for (auto i = metaj.begin(); i != metaj.end(); ++i)
{
auto pos = i.key().find(":");
if (pos != std::string::npos)
var.position = varBlock["P"].get<size_t>();
var.index = key;

auto it = varBlock.find("Z");
if (it != varBlock.end())
{
var.compression = it->get<std::string>();
}

for (auto i = varBlock.begin(); i != varBlock.end(); ++i)
{
auto pos = i.key().find(":");
if (pos != std::string::npos)
{
var.params[i.key().substr(pos + 1)] = i.value();
}
}

if (m_MetaDataMap[var.step] == nullptr)
{
m_MetaDataMap[var.step] =
std::make_shared<std::vector<DataManVar>>();
}
m_MetaDataMap[var.step]->emplace_back(std::move(var));
}
catch (std::exception &e)
{
var.params[i.key().substr(pos + 1)] = i.value();
std::cout << e.what() << std::endl;
return -1;
}
if (m_MaxStep < var.step)
{
m_MaxStep = var.step;
}
if (m_MinStep > var.step)
{
m_MinStep = var.step;
}
}

if (position + var.size > data->capacity())
{
break;
}
if (m_MetaDataMap[var.step] == nullptr)
{
m_MetaDataMap[var.step] =
std::make_shared<std::vector<DataManVar>>();
}
m_MetaDataMap[var.step]->push_back(std::move(var));
position += var.size;
}
catch (std::exception &e)
{
std::cout << e.what() << std::endl;
return -1;
}
if (m_MaxStep < var.step)
{
m_MaxStep = var.step;
}
if (m_MinStep > var.step)
{
m_MinStep = var.step;
}
}
return 0;
Expand Down
10 changes: 0 additions & 10 deletions source/adios2/toolkit/format/dataman/DataManDeserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,6 @@ class DataManDeserializer
bool m_IsLittleEndian;

std::mutex m_Mutex;

struct VarDefaults
{
std::string doid;
bool isRowMajor;
bool isLittleEndian;
std::string type;
Dims shape;
};
std::map<std::string, VarDefaults> m_VarDefaultsMap;
};

} // end namespace format
Expand Down
15 changes: 9 additions & 6 deletions source/adios2/toolkit/format/dataman/DataManSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,24 @@ void DataManSerializer::New(size_t size)
// still be alive and needed somewhere in the workflow, for example the
// queue in transport manager. It will be automatically released when the
// entire workflow finishes using it.
m_Metadata = nullptr;
m_Buffer = std::make_shared<std::vector<char>>();
m_Buffer->reserve(size);
m_Position = 0;
m_Position = sizeof(uint64_t) * 2;
}

const std::shared_ptr<std::vector<char>> DataManSerializer::Get()
{
std::vector<uint8_t> metacbor = nlohmann::json::to_msgpack(m_Metadata);
size_t metasize = metacbor.size();
m_Buffer->resize(m_Position + metasize);
(reinterpret_cast<uint64_t *>(m_Buffer->data()))[0] = m_Position;
(reinterpret_cast<uint64_t *>(m_Buffer->data()))[1] = metasize;
std::memcpy(m_Buffer->data() + m_Position, metacbor.data(), metasize);
return m_Buffer;
}

float DataManSerializer::GetMetaRatio()
{
return static_cast<float>(m_TotalMetadataSize) /
static_cast<float>(m_TotalMetadataSize + m_TotalDataSize);
}
float DataManSerializer::GetMetaRatio() { return 0; }

std::shared_ptr<std::vector<char>> DataManSerializer::EndSignal(size_t step)
{
Expand Down
36 changes: 20 additions & 16 deletions source/adios2/toolkit/format/dataman/DataManSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@
#include <mutex>
#include <unordered_map>

// A - Address
// C - Count
// D - Data Object ID or File Name
// E - Endian
// H - Meatadata Hash
// I - Data Size
// M - Major
// N - Variable Name
// O - Start
// P - Position of Memory Block
// S - Shape
// X - Index (Used only in deserializer)
// Y - Data Type
// Z - Compression Method
// ZP - Compression Parameters

namespace adios2
{
namespace format
Expand All @@ -33,17 +49,18 @@ class DataManSerializer
void Put(const T *inputData, const std::string &varName,
const Dims &varShape, const Dims &varStart, const Dims &varCount,
const std::string &doid, const size_t step, const int rank,
const Params &params, const bool optimizeMetadata);
std::string address, const Params &params);
template <class T>
void Put(const core::Variable<T> &variable, const std::string &doid,
const size_t step, const int rank, const Params &params,
const bool optimizeMetadata);
const size_t step, const int rank, std::string address,
const Params &params);
const std::shared_ptr<std::vector<char>> Get();
float GetMetaRatio();
static std::shared_ptr<std::vector<char>> EndSignal(size_t step);

private:
std::shared_ptr<std::vector<char>> m_Buffer;
nlohmann::json m_Metadata;
std::vector<char> m_CompressBuffer;
size_t m_Position = 0;
bool m_IsRowMajor;
Expand All @@ -60,19 +77,6 @@ class DataManSerializer

bool IsCompressionAvailable(const std::string &method,
const std::string &type, const Dims &count);

size_t m_TotalDataSize;
size_t m_TotalMetadataSize;

struct VarDefaults
{
std::string doid;
bool isRowMajor;
bool isLittleEndian;
std::string type;
Dims shape;
};
std::map<std::string, VarDefaults> m_VarDefaultsMap;
};

} // end namespace format
Expand Down

0 comments on commit 128736b

Please sign in to comment.