Skip to content

Commit

Permalink
Merge pull request #1016 from JasonRuonanWang/dataman-callback
Browse files Browse the repository at this point in the history
temporarily disabled dataman callback
  • Loading branch information
JasonRuonanWang committed Nov 27, 2018
2 parents a97eabe + 8ecf381 commit c54e0b1
Show file tree
Hide file tree
Showing 7 changed files with 4 additions and 272 deletions.
9 changes: 0 additions & 9 deletions source/adios2/engine/dataman/DataManCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ DataManCommon::DataManCommon(const std::string engineType, IO &io,
{
m_StreamNames.push_back(m_Name + std::to_string(i));
}

// register callbacks
for (auto &j : m_IO.m_Operations)
{
if (j.Op->m_Type == "Signature2")
{
m_Callbacks.push_back(j.Op);
}
}
}

bool DataManCommon::GetStringParameter(Params &params, std::string key,
Expand Down
2 changes: 0 additions & 2 deletions source/adios2/engine/dataman/DataManCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class DataManCommon : public Engine
transport::FileFStream m_FileTransport;

std::vector<std::string> m_StreamNames;
std::vector<core::Operator *> m_Callbacks;
std::mutex m_CallbackMutex;

std::shared_ptr<transportman::DataMan> m_DataMan;
std::shared_ptr<std::thread> m_DataThread;
Expand Down
68 changes: 4 additions & 64 deletions source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,66 +180,6 @@ void DataManReader::IOThread(std::shared_ptr<transportman::DataMan> man)
m_FinalStep = ret;
}
}
RunCallback();
}
}

void DataManReader::RunCallback()
{
std::lock_guard<std::mutex> l(m_CallbackMutex);
if (m_Callbacks.empty() == false)
{
for (size_t step = m_DataManDeserializer.MinStep();
step <= m_DataManDeserializer.MaxStep(); ++step)
{
auto varList = m_DataManDeserializer.GetMetaData(step);
if (varList == nullptr)
{
continue;
}
for (const auto &i : *varList)
{
if (i.type == "compound")
{
throw("Compound type is not supported yet.");
}
#define declare_type(T) \
else if (i.type == helper::GetType<T>()) \
{ \
CheckIOVariable<T>(i.name, i.shape, i.start, i.count); \
size_t datasize = \
std::accumulate(i.count.begin(), i.count.end(), sizeof(T), \
std::multiplies<size_t>()); \
std::vector<T> varData(datasize, std::numeric_limits<T>::quiet_NaN()); \
m_DataManDeserializer.Get(varData.data(), i.name, i.start, i.count, \
step); \
for (auto &j : m_Callbacks) \
{ \
if (j->m_Type == "Signature1") \
{ \
j->RunCallback2(reinterpret_cast<T *>(varData.data()), i.doid, \
i.name, i.type, step, i.shape, i.start, \
i.count); \
} \
else if (j->m_Type == "Signature2") \
{ \
j->RunCallback2(varData.data(), i.doid, i.name, i.type, step, \
i.shape, i.start, i.count); \
} \
else \
{ \
throw(std::runtime_error( \
"[DataManReader::RunCallback] Callback funtion " \
"registered is not of Signatrue2. It might be modified " \
"from outside DataMan Engine.")); \
} \
} \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
}
m_DataManDeserializer.Erase(step);
}
}
}

Expand Down Expand Up @@ -270,12 +210,12 @@ void DataManReader::DoClose(const int transportIndex)
if (transportIndex == -1)
{
m_Listening = false;
m_CallbackMutex.lock();
m_Callbacks.resize(0);
m_CallbackMutex.unlock();
if (m_DataThread != nullptr)
{
m_DataThread->join();
if (m_DataThread->joinable())
{
m_DataThread->join();
}
}
}
m_DataMan = nullptr;
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/dataman/DataManReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class DataManReader : public DataManCommon

void Init();
void IOThread(std::shared_ptr<transportman::DataMan> man);
void RunCallback();
void DoClose(const int transportIndex = -1) final;

#define declare_type(T) \
Expand Down
6 changes: 0 additions & 6 deletions testing/adios2/engine/dataman/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,4 @@ if(ADIOS2_HAVE_MPI)
endif()
gtest_add_tests(TARGET TestDataManSubscribe1D)

add_executable(TestDataManCallback1D TestDataManCallback1D.cpp)
target_link_libraries(TestDataManCallback1D adios2 gtest)
if(ADIOS2_HAVE_MPI)
target_link_libraries(TestDataManCallback1D MPI::MPI_C)
endif()
gtest_add_tests(TARGET TestDataManCallback1D)

118 changes: 0 additions & 118 deletions testing/adios2/engine/dataman/TestDataMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,94 +111,6 @@ void VerifyData(const std::vector<T> &data, const size_t step,
VerifyData(data.data(), data.size(), step, transParams);
}

void UserCallBack2(float *data, const std::string &doid, const std::string &var,
const std::string &dtype, const size_t step,
const adios2::Dims &varshape, const adios2::Dims &start,
const adios2::Dims &count)
{
std::cout << "Object : " << doid << ", ";
std::cout << "Variable :" << var << ", ";
std::cout << "Type : " << dtype << ", ";
std::cout << "Shape : [";
for (size_t i = 0; i < varshape.size(); ++i)
{
std::cout << varshape[i];
if (i != varshape.size() - 1)
{
std::cout << ", ";
}
}
std::cout << "]"
<< ". ";

size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1,
std::multiplies<std::size_t>());

size_t dumpsize = 128;
if (varsize < dumpsize)
{
dumpsize = varsize;
}

std::cout << "Printing data for the first " << dumpsize << " elements: ";

for (size_t i = 0; i < dumpsize; ++i)
{
std::cout << data[i] << " ";
}
std::cout << std::endl;
}

void UserCallBack1(void *data, const std::string &doid, const std::string &var,
const std::string &dtype, const size_t step,
const adios2::Dims &varshape, const adios2::Dims &start,
const adios2::Dims &count)
{
std::cout << "Object : " << doid << ", ";
std::cout << "Variable :" << var << ", ";
std::cout << "Type : " << dtype << ", ";
std::cout << "Shape : [";
for (size_t i = 0; i < varshape.size(); ++i)
{
std::cout << varshape[i];
if (i != varshape.size() - 1)
{
std::cout << ", ";
}
}
std::cout << "]"
<< ". ";

size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1,
std::multiplies<std::size_t>());

size_t dumpsteps = 10;
size_t dumpsize = 128;
if (varsize < dumpsize)
{
dumpsize = varsize;
}

if (step < dumpsteps)
{

std::cout << "Printing data for the first " << dumpsize
<< " elements: ";

#define declare_type(T) \
if (dtype == adios2::helper::GetType<T>()) \
{ \
for (size_t i = 0; i < dumpsize; ++i) \
{ \
std::cout << (reinterpret_cast<T *>(data))[i] << " "; \
} \
std::cout << std::endl; \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
}
}

void DataManWriter(const Dims &shape, const Dims &start, const Dims &count,
const size_t steps, const adios2::Params &engineParams,
const std::vector<adios2::Params> &transParams)
Expand Down Expand Up @@ -398,36 +310,6 @@ void DataManReaderP2P(const Dims &shape, const Dims &start, const Dims &count,
print_lines = 0;
}

void DataManReaderCallback(const Dims &shape, const Dims &start,
const Dims &count, const size_t steps,
const adios2::Params &engineParams,
const std::vector<adios2::Params> &transParams,
const size_t timeout)
{
#ifdef ADIOS2_HAVE_MPI
adios2::ADIOS adios(MPI_COMM_SELF, adios2::DebugON);
#else
adios2::ADIOS adios(adios2::DebugON);
#endif
adios2::Operator callback = adios.DefineOperator(
"Print all variables callback void",
std::function<void(void *, const std::string &, const std::string &,
const std::string &, const size_t,
const adios2::Dims &, const adios2::Dims &,
const adios2::Dims &)>(UserCallBack1));
adios2::IO dataManIO = adios.DeclareIO("WAN");
dataManIO.SetEngine("DataMan");
dataManIO.SetParameters(engineParams);
for (const auto &params : transParams)
{
dataManIO.AddTransport("WAN", params);
}
dataManIO.AddOperation(callback);
adios2::Engine dataManReader = dataManIO.Open("stream", adios2::Mode::Read);
std::this_thread::sleep_for(std::chrono::seconds(timeout));
dataManReader.Close();
}

void DataManReaderSubscribe(const Dims &shape, const Dims &start,
const Dims &count, const size_t steps,
const adios2::Params &engineParams,
Expand Down
72 changes: 0 additions & 72 deletions testing/adios2/engine/dataman/TestDataManCallback1D.cpp

This file was deleted.

0 comments on commit c54e0b1

Please sign in to comment.