Skip to content

Commit

Permalink
Reorg Remote as base class in preparation for more remote subclasses (#…
Browse files Browse the repository at this point in the history
…4162)

Reorg Remote as base class in preparation for more remote implementations.
  • Loading branch information
eisenhauer committed May 13, 2024
1 parent c8b12cd commit 54313f9
Show file tree
Hide file tree
Showing 12 changed files with 390 additions and 249 deletions.
2 changes: 1 addition & 1 deletion source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ endif()

if (ADIOS2_HAVE_SST)
# EVPath-enabled remote file transport
target_sources(adios2_core PRIVATE toolkit/remote/remote_common.cpp toolkit/transport/file/FileRemote.cpp)
target_sources(adios2_core PRIVATE toolkit/remote/remote_common.cpp toolkit/transport/file/FileRemote.cpp toolkit/remote/EVPathRemote.cpp)
target_link_libraries(adios2_core PRIVATE adios2::thirdparty::EVPath)
add_subdirectory(toolkit/remote)
endif()
Expand Down
23 changes: 18 additions & 5 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "BP5Reader.tcc"

#include "adios2/helper/adiosMath.h" // SetWithinLimit
#include "adios2/toolkit/remote/EVPathRemote.h"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include <adios2-perfstubs-interface.h>

Expand Down Expand Up @@ -283,15 +284,27 @@ void BP5Reader::PerformGets()
if (m_BP5Deserializer->PendingGetRequests.size() == 0)
return;

std::string RemoteName;
if (!m_Parameters.RemoteDataPath.empty())
{
m_Remote.Open("localhost", RemoteCommon::ServerPort, m_Parameters.RemoteDataPath,
m_OpenMode, RowMajorOrdering);
RemoteName = m_Parameters.RemoteDataPath;
}
else if (getenv("DoRemote"))
{
m_Remote.Open("localhost", RemoteCommon::ServerPort, m_Name, m_OpenMode,
RowMajorOrdering);
RemoteName = m_Name;
}
(void)RowMajorOrdering; // Use in case no remotes available
#ifdef ADIOS2_HAVE_SST
m_Remote = std::unique_ptr<EVPathRemote>(new EVPathRemote());
m_Remote->Open("localhost", EVPathRemoteCommon::ServerPort, RemoteName, m_OpenMode,
RowMajorOrdering);
#endif
if (m_Remote == nullptr)
{
helper::Throw<std::ios_base::failure>(
"Engine", "BP5Reader", "OpenFiles",
"Remote file " + m_Name +
" cannot be opened. Possible server or file specification error.");
}
if (!m_Remote)
{
Expand Down Expand Up @@ -324,7 +337,7 @@ void BP5Reader::PerformRemoteGets()
auto GetRequests = m_BP5Deserializer->PendingGetRequests;
for (auto &Req : GetRequests)
{
m_Remote.Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
m_Remote->Get(Req.VarName, Req.RelStep, Req.BlockID, Req.Count, Req.Start, Req.Data);
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class BP5Reader : public BP5Engine, public Engine
/* transport manager for managing the active flag file */
transportman::TransportMan m_ActiveFlagFileManager;
bool m_dataIsRemote = false;
Remote m_Remote;
std::unique_ptr<Remote> m_Remote;
bool m_WriterIsActive = true;
adios2::profiling::JSONProfiler m_JSONProfiler;

Expand Down
219 changes: 219 additions & 0 deletions source/adios2/toolkit/remote/EVPathRemote.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
*/
#include "EVPathRemote.h"
#include "Remote.h"
#include "adios2/core/ADIOS.h"
#include "adios2/helper/adiosLog.h"
#include "adios2/helper/adiosString.h"
#include "adios2/helper/adiosSystem.h"
#ifdef _MSC_VER
#define strdup(x) _strdup(x)
#endif

#define ThrowUp(x) \
helper::Throw<std::invalid_argument>("Core", "Engine", "ThrowUp", \
"Non-overridden function " + std::string(x) + \
" called in Remote")

namespace adios2
{

EVPathRemote::EVPathRemote() {}

#ifdef ADIOS2_HAVE_SST
EVPathRemote::~EVPathRemote()
{
if (m_conn)
CMConnection_close(m_conn);
}

void OpenResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::OpenResponseMsg open_response_msg =
static_cast<EVPathRemoteCommon::OpenResponseMsg>(vevent);

void *obj = CMCondition_get_client_data(cm, open_response_msg->OpenResponseCondition);
static_cast<EVPathRemote *>(obj)->m_ID = open_response_msg->FileHandle;
CMCondition_signal(cm, open_response_msg->OpenResponseCondition);
return;
};

void OpenSimpleResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::OpenSimpleResponseMsg open_response_msg =
static_cast<EVPathRemoteCommon::OpenSimpleResponseMsg>(vevent);

void *obj = CMCondition_get_client_data(cm, open_response_msg->OpenResponseCondition);
static_cast<EVPathRemote *>(obj)->m_ID = open_response_msg->FileHandle;
static_cast<EVPathRemote *>(obj)->m_Size = open_response_msg->FileSize;
CMCondition_signal(cm, open_response_msg->OpenResponseCondition);
return;
};

void ReadResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
EVPathRemoteCommon::ReadResponseMsg read_response_msg =
static_cast<EVPathRemoteCommon::ReadResponseMsg>(vevent);
memcpy(read_response_msg->Dest, read_response_msg->ReadData, read_response_msg->Size);
CMCondition_signal(cm, read_response_msg->ReadResponseCondition);
return;
};

CManagerSingleton &CManagerSingleton::Instance(EVPathRemoteCommon::Remote_evpath_state &ev_state)
{
std::mutex mtx;
const std::lock_guard<std::mutex> lock(mtx);
static CManagerSingleton instance;
ev_state = instance.internalEvState;
return instance;
}

void EVPathRemote::InitCMData()
{
(void)CManagerSingleton::Instance(ev_state);
static std::once_flag flag;
std::call_once(flag, [&]() {
CMregister_handler(ev_state.OpenResponseFormat, (CMHandlerFunc)OpenResponseHandler,
&ev_state);
CMregister_handler(ev_state.ReadResponseFormat, (CMHandlerFunc)ReadResponseHandler,
&ev_state);
CMregister_handler(ev_state.OpenSimpleResponseFormat,
(CMHandlerFunc)OpenSimpleResponseHandler, &ev_state);
CMregister_handler(ev_state.ReadResponseFormat, (CMHandlerFunc)ReadResponseHandler,
&ev_state);
});
}

void EVPathRemote::Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering)
{

EVPathRemoteCommon::_OpenFileMsg open_msg;
InitCMData();
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)strdup(hostname.c_str()));
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)port);
m_conn = CMinitiate_conn(ev_state.cm, contact_list);
free_attr_list(contact_list);
if (!m_conn)
return;

memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
switch (mode)
{
case Mode::Read:
open_msg.Mode = EVPathRemoteCommon::RemoteFileMode::RemoteOpen;
break;
case Mode::ReadRandomAccess:
open_msg.Mode = EVPathRemoteCommon::RemoteFileMode::RemoteOpenRandomAccess;
break;
default:
break;
}
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
open_msg.RowMajorOrder = RowMajorOrdering;
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.OpenFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
m_Active = true;
}

void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename)
{

EVPathRemoteCommon::_OpenSimpleFileMsg open_msg;
InitCMData();
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)strdup(hostname.c_str()));
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)port);
m_conn = CMinitiate_conn(ev_state.cm, contact_list);
free_attr_list(contact_list);
if (!m_conn)
return;

memset(&open_msg, 0, sizeof(open_msg));
open_msg.FileName = (char *)filename.c_str();
open_msg.OpenResponseCondition = CMCondition_get(ev_state.cm, m_conn);
CMCondition_set_client_data(ev_state.cm, open_msg.OpenResponseCondition, (void *)this);
CMwrite(m_conn, ev_state.OpenSimpleFileFormat, &open_msg);
CMCondition_wait(ev_state.cm, open_msg.OpenResponseCondition);
m_Active = true;
}

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
EVPathRemoteCommon::_GetRequestMsg GetMsg;
memset(&GetMsg, 0, sizeof(GetMsg));
GetMsg.GetResponseCondition = CMCondition_get(ev_state.cm, m_conn);
GetMsg.FileHandle = m_ID;
GetMsg.VarName = VarName;
GetMsg.Step = Step;
GetMsg.BlockID = BlockID;
GetMsg.DimCount = (int)Count.size();
GetMsg.Count = Count.data();
GetMsg.Start = Start.data();
GetMsg.Dest = dest;
CMwrite(m_conn, ev_state.GetRequestFormat, &GetMsg);
CMCondition_wait(ev_state.cm, GetMsg.GetResponseCondition);
return GetMsg.GetResponseCondition;
}

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
{
EVPathRemoteCommon::_ReadRequestMsg ReadMsg;
memset(&ReadMsg, 0, sizeof(ReadMsg));
ReadMsg.ReadResponseCondition = CMCondition_get(ev_state.cm, m_conn);
ReadMsg.FileHandle = m_ID;
ReadMsg.Offset = Start;
ReadMsg.Size = Size;
ReadMsg.Dest = Dest;
CMwrite(m_conn, ev_state.ReadRequestFormat, &ReadMsg);
CMCondition_wait(ev_state.cm, ReadMsg.ReadResponseCondition);
return ReadMsg.ReadResponseCondition;
}

bool EVPathRemote::WaitForGet(GetHandle handle)
{
return CMCondition_wait(ev_state.cm, (int)handle);
}
#else

void EVPathRemote::Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering){};

void EVPathRemote::OpenSimpleFile(const std::string hostname, const int32_t port,
const std::string filename){};

EVPathRemote::GetHandle EVPathRemote::Get(char *VarName, size_t Step, size_t BlockID, Dims &Count,
Dims &Start, void *dest)
{
return static_cast<GetHandle>(0);
};

bool EVPathRemote::WaitForGet(GetHandle handle) { return false; }

EVPathRemote::GetHandle EVPathRemote::Read(size_t Start, size_t Size, void *Dest)
{
return static_cast<GetHandle>(0);
};
EVPathRemote::~EVPathRemote() {}
#endif

} // end namespace adios2
89 changes: 89 additions & 0 deletions source/adios2/toolkit/remote/EVPathRemote.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*/

#ifndef ADIOS2_TOOLKIT_REMOTE_EVPATHREMOTE_H_
#define ADIOS2_TOOLKIT_REMOTE_EVPATHREMOTE_H_

/// \cond EXCLUDE_FROM_DOXYGEN
#include <mutex>
#include <string>
#include <vector>
/// \endcond

#include "adios2/toolkit/profiling/iochrono/IOChrono.h"

#include "Remote.h"
#include "adios2/common/ADIOSConfig.h"

#include "remote_common.h"

namespace adios2
{

class EVPathRemote : public Remote
{

public:
profiling::IOChrono m_Profiler; ///< profiles Open, Write/Read, Close

/**
* Base constructor that all derived classes pass
* @param type from derived class
* @param comm passed to m_Comm
*/
EVPathRemote();
~EVPathRemote();

explicit operator bool() const { return m_Active; }

void Open(const std::string hostname, const int32_t port, const std::string filename,
const Mode mode, bool RowMajorOrdering);

void OpenSimpleFile(const std::string hostname, const int32_t port, const std::string filename);

typedef int GetHandle;

GetHandle Get(char *VarName, size_t Step, size_t BlockID, Dims &Count, Dims &Start, void *dest);

bool WaitForGet(GetHandle handle);

GetHandle Read(size_t Start, size_t Size, void *Dest);

int64_t m_ID;

private:
#ifdef ADIOS2_HAVE_SST
void InitCMData();
EVPathRemoteCommon::Remote_evpath_state ev_state;
CMConnection m_conn = NULL;
std::mutex m_CMInitMutex;
#endif
bool m_Active = false;
};

#ifdef ADIOS2_HAVE_SST
class CManagerSingleton
{
public:
static CManagerSingleton &Instance(EVPathRemoteCommon::Remote_evpath_state &ev_state);

private:
CManager m_cm = NULL;
EVPathRemoteCommon::Remote_evpath_state internalEvState;
CManagerSingleton()
{
m_cm = CManager_create();
internalEvState.cm = m_cm;
RegisterFormats(internalEvState);
CMfork_comm_thread(internalEvState.cm);
}

~CManagerSingleton() { CManager_close(m_cm); }
};
#endif

} // end namespace adios2

#endif /* ADIOS2_TOOLKIT_EVPATHREMOTE_REMOTE_H_ */

0 comments on commit 54313f9

Please sign in to comment.