Skip to content

Commit

Permalink
Add prototype testing of remote functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Sep 28, 2023
1 parent c503940 commit f79044f
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 42 deletions.
4 changes: 2 additions & 2 deletions source/adios2/toolkit/remote/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ void Remote::InitCMData()
{
std::lock_guard<std::mutex> lockGuard(m_CMInitMutex);
bool first = true;
auto &CM = CManagerSingleton::Instance(first);
ev_state.cm = CM.m_cm;
auto CM = CManagerSingleton::Instance(first);
ev_state.cm = CM->m_cm;
RegisterFormats(ev_state);
if (first)
{
Expand Down
20 changes: 3 additions & 17 deletions source/adios2/toolkit/remote/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,15 @@ class CManagerSingleton
#ifdef ADIOS2_HAVE_SST
CManager m_cm = NULL;
#endif
static CManagerSingleton &Instance(bool &first)
static CManagerSingleton *Instance(bool &first)
{
// Since it's a static variable, if the class has already been created,
// it won't be created again.
// And it **is** thread-safe in C++11.
static CManagerSingleton myInstance;
static CManagerSingleton *ptr = new CManagerSingleton();
static bool internal_first = true;
// Return a reference to our instance.

first = internal_first;
internal_first = false;
return myInstance;
return ptr;
}

// delete copy and move constructors and assign operators
CManagerSingleton(CManagerSingleton const &) = delete; // Copy construct
CManagerSingleton(CManagerSingleton &&) = delete; // Move construct
CManagerSingleton &operator=(CManagerSingleton const &) = delete; // Copy assign
CManagerSingleton &operator=(CManagerSingleton &&) = delete; // Move assign

// Any other public methods.

protected:
#ifdef ADIOS2_HAVE_SST
CManagerSingleton() { m_cm = CManager_create(); }
Expand All @@ -101,7 +88,6 @@ class CManagerSingleton

~CManagerSingleton() {}
#endif
// And any other protected methods.
};

} // end namespace adios2
Expand Down
19 changes: 19 additions & 0 deletions source/adios2/toolkit/remote/remote_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ FMField CloseFileList[] = {
FMStructDescRec CloseFileStructs[] = {{"Close", CloseFileList, sizeof(struct _CloseFileMsg), NULL},
{NULL, NULL, 0, NULL}};

FMField KillServerList[] = {{"KillResponseCondition", "integer", sizeof(long),
FMOffset(KillServerMsg, KillResponseCondition)},
{NULL, NULL, 0, 0}};

FMStructDescRec KillServerStructs[] = {
{"KillServer", KillServerList, sizeof(struct _KillServerMsg), NULL}, {NULL, NULL, 0, NULL}};

FMField KillResponseList[] = {
{"KillResponseCondition", "integer", sizeof(long),
FMOffset(KillResponseMsg, KillResponseCondition)},
{"Status", "string", sizeof(char *), FMOffset(KillResponseMsg, Status)},
{NULL, NULL, 0, 0}};

FMStructDescRec KillResponseStructs[] = {
{"KillResponse", KillResponseList, sizeof(struct _KillResponseMsg), NULL},
{NULL, NULL, 0, NULL}};

void RegisterFormats(RemoteCommon::Remote_evpath_state &ev_state)
{
ev_state.OpenFileFormat = CMregister_format(ev_state.cm, RemoteCommon::OpenFileStructs);
Expand All @@ -108,6 +125,8 @@ void RegisterFormats(RemoteCommon::Remote_evpath_state &ev_state)
ev_state.ReadRequestFormat = CMregister_format(ev_state.cm, RemoteCommon::ReadRequestStructs);
ev_state.ReadResponseFormat = CMregister_format(ev_state.cm, RemoteCommon::ReadResponseStructs);
ev_state.CloseFileFormat = CMregister_format(ev_state.cm, RemoteCommon::CloseFileStructs);
ev_state.KillServerFormat = CMregister_format(ev_state.cm, RemoteCommon::KillServerStructs);
ev_state.KillResponseFormat = CMregister_format(ev_state.cm, RemoteCommon::KillResponseStructs);
}
}
}
13 changes: 13 additions & 0 deletions source/adios2/toolkit/remote/remote_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ typedef struct _CloseFileMsg
void *FileHandle;
} *CloseFileMsg;

typedef struct _KillServerMsg
{
int KillResponseCondition;
} *KillServerMsg;

typedef struct _KillResponseMsg
{
int KillResponseCondition;
char *Status;
} *KillResponseMsg;

enum VerbosityLevel
{
NoVerbose = 0, // Generally no output (but not absolutely quiet?)
Expand All @@ -111,6 +122,8 @@ struct Remote_evpath_state
CMFormat ReadRequestFormat;
CMFormat ReadResponseFormat;
CMFormat CloseFileFormat;
CMFormat KillServerFormat;
CMFormat KillResponseFormat;
};

void RegisterFormats(struct Remote_evpath_state &ev_state);
Expand Down
158 changes: 135 additions & 23 deletions source/adios2/toolkit/remote/remote_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ using namespace adios2;
int verbose = 1;
ADIOS adios("C++");

size_t TotalSimpleBytesSent = 0;
size_t TotalGetBytesSent = 0;
size_t TotalSimpleReads = 0;
size_t TotalGets = 0;
size_t SimpleFilesOpened = 0;
size_t ADIOSFilesOpened = 0;

std::string readable_size(uint64_t size)
{
constexpr const char FILE_SIZE_UNITS[8][3]{"B ", "KB", "MB", "GB", "TB", "PB", "EB", "ZB"};
Expand Down Expand Up @@ -178,6 +185,7 @@ static void OpenHandler(CManager cm, CMConnection conn, void *vevent, void *clie
CMconn_register_close_handler(conn, ConnCloseHandler, NULL);
ADIOSFileMap[f->m_ID] = f;
ConnToFileMap.emplace(conn, f->m_ID);
ADIOSFilesOpened++;
}

static void OpenSimpleHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
Expand All @@ -198,6 +206,7 @@ static void OpenSimpleHandler(CManager cm, CMConnection conn, void *vevent, void
CMconn_register_close_handler(conn, ConnCloseHandler, NULL);
SimpleFileMap[f->m_ID] = f;
ConnToFileMap.emplace(conn, f->m_ID);
SimpleFilesOpened++;
}

static void GetRequestHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
Expand Down Expand Up @@ -263,6 +272,8 @@ static void GetRequestHandler(CManager cm, CMConnection conn, void *vevent, void
<< " for Get<" << TypeOfVar << ">(" << VarName << ")" << b << std::endl; \
f->m_BytesSent += Response.Size; \
f->m_OperationCount++; \
TotalGetBytesSent += Response.Size; \
TotalGets++; \
CMwrite(conn, ev_state->ReadResponseFormat, &Response); \
}
ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(GET)
Expand Down Expand Up @@ -300,16 +311,78 @@ static void ReadRequestHandler(CManager cm, CMConnection conn, void *vevent, voi
std::cout << "Returning " << readable_size(Response.Size) << " for Read " << std::endl;
f->m_BytesSent += Response.Size;
f->m_OperationCount++;
TotalSimpleBytesSent += Response.Size;
TotalSimpleReads++;
CMwrite(conn, ev_state->ReadResponseFormat, &Response);
free(tmp);
}

void REVPServerRegisterHandlers(struct Remote_evpath_state &ev_state)
static void KillServerHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
KillServerMsg kill_msg = static_cast<KillServerMsg>(vevent);
struct Remote_evpath_state *ev_state = static_cast<struct Remote_evpath_state *>(client_data);
_KillResponseMsg kill_response_msg;
memset(&kill_response_msg, 0, sizeof(kill_response_msg));
kill_response_msg.KillResponseCondition = kill_msg->KillResponseCondition;
std::stringstream Status;
Status << "ADIOS files Opened: " << ADIOSFilesOpened << " (" << TotalGets << " gets for "
<< readable_size(TotalGetBytesSent) << ") Simple files opened: " << SimpleFilesOpened
<< " (" << TotalSimpleReads << " reads for " << readable_size(TotalSimpleBytesSent)
<< ")";
kill_response_msg.Status = strdup(Status.str().c_str());
CMwrite(conn, ev_state->KillResponseFormat, &kill_response_msg);
free(kill_response_msg.Status);
exit(0);
}

static void KillResponseHandler(CManager cm, CMConnection conn, void *vevent, void *client_data,
attr_list attrs)
{
KillResponseMsg kill_response_msg = static_cast<KillResponseMsg>(vevent);
std::cout << "Server final status: " << kill_response_msg->Status << std::endl;
exit(0);
}

void ServerRegisterHandlers(struct Remote_evpath_state &ev_state)
{
CMregister_handler(ev_state.OpenFileFormat, OpenHandler, &ev_state);
CMregister_handler(ev_state.OpenSimpleFileFormat, OpenSimpleHandler, &ev_state);
CMregister_handler(ev_state.GetRequestFormat, GetRequestHandler, &ev_state);
CMregister_handler(ev_state.ReadRequestFormat, ReadRequestHandler, &ev_state);
CMregister_handler(ev_state.KillServerFormat, KillServerHandler, &ev_state);
CMregister_handler(ev_state.KillResponseFormat, KillResponseHandler, &ev_state);
}

static const char *hostname = "localhost";

void connect_and_kill(int ServerPort)
{
CManager cm = CManager_create();
_KillServerMsg kill_msg;
struct Remote_evpath_state ev_state;
attr_list contact_list = create_attr_list();
atom_t CM_IP_PORT = -1;
atom_t CM_IP_HOSTNAME = -1;
CM_IP_HOSTNAME = attr_atom_from_string("IP_HOST");
CM_IP_PORT = attr_atom_from_string("IP_PORT");
add_attr(contact_list, CM_IP_HOSTNAME, Attr_String, (attr_value)hostname);
add_attr(contact_list, CM_IP_PORT, Attr_Int4, (attr_value)ServerPort);
CMConnection conn = CMinitiate_conn(cm, contact_list);
if (!conn)
return;

ev_state.cm = cm;

RegisterFormats(ev_state);

ServerRegisterHandlers(ev_state);

memset(&kill_msg, 0, sizeof(kill_msg));
kill_msg.KillResponseCondition = CMCondition_get(ev_state.cm, conn);
CMwrite(conn, ev_state.KillServerFormat, &kill_msg);
CMCondition_wait(ev_state.cm, kill_msg.KillResponseCondition);
exit(0);
}

static atom_t CM_IP_PORT = -1;
Expand All @@ -318,9 +391,67 @@ int main(int argc, char **argv)
{
CManager cm;
struct Remote_evpath_state ev_state;
int background = 0;
int kill_server = 0;

for (int i = 1; i < argc; i++)
{
if (strcmp(argv[i], "-background") == 0)
{
background++;
}
else if (strcmp(argv[i], "-kill_server") == 0)
{
kill_server++;
}
if (argv[i][0] == '-')
{
size_t j = 1;
while (argv[i][j] != 0)
{
if (argv[i][j] == 'v')
{
verbose++;
}
else if (argv[i][j] == 'q')
{
verbose--;
}
j++;
}
}
else
{
fprintf(stderr, "Unknown argument \"%s\"\n", argv[i]);
fprintf(stderr, "Usage: remote_server [-background] [-kill_server] [-v] [-q]\n");
exit(1);
}
}

if (kill_server)
{
connect_and_kill(ServerPort);
exit(0);
}
if (background)
{
if (verbose)
{
printf("Forking server to background\n");
}
if (fork() != 0)
{
/* I'm the parent, wait a sec to let the child start, then exit */
sleep(1);
exit(0);
}
/* I'm the child, close IO FDs so that ctest continues. No verbosity here */
verbose = 0;
close(0);
close(1);
close(2);
}

(void)argc;
(void)argv;
cm = CManager_create();
CM_IP_PORT = attr_atom_from_string("IP_PORT");
attr_list listen_list = NULL;
Expand All @@ -339,28 +470,9 @@ int main(int argc, char **argv)
}
ev_state.cm = cm;

while (argv[1] && (argv[1][0] == '-'))
{
size_t i = 1;
while (argv[1][i] != 0)
{
if (argv[1][i] == 'v')
{
verbose++;
}
else if (argv[1][i] == 'q')
{
verbose--;
}
i++;
}
argv++;
argc--;
}

RegisterFormats(ev_state);

REVPServerRegisterHandlers(ev_state);
ServerRegisterHandlers(ev_state);

std::cout << "doing Run Network" << std::endl;
CMrun_network(cm);
Expand Down
19 changes: 19 additions & 0 deletions testing/adios2/engine/bp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ bp_gtest_add_tests_helper(LargeMetadata MPI_ALLOW)

set(BP5LargeMeta "Engine.BP.BPLargeMetadata.BPWrite1D_LargeMetadata.BP5.Serial")

if ((NOT WIN32) AND ADIOS2_HAVE_SST)
# prototype for remote server testing
# (we don't really use SST here, just EVPath, but ADIOS2_HAVE_SST is the most relevant conditional)
macro(add_remote_tests_helper testname)
add_test(NAME "Remote.BP${testname}.GetRemote" COMMAND Test.Engine.BP.${testname}.Serial bp5)
set_tests_properties(Remote.BP${testname}.GetRemote PROPERTIES FIXTURES_REQUIRED Server ENVIRONMENT "DoRemote=1")
endmacro()

add_test(NAME remoteServerSetup COMMAND remote_server -background)
set_tests_properties(remoteServerSetup PROPERTIES FIXTURES_SETUP Server)

add_test(NAME remoteServerCleanup COMMAND remote_server -kill_server)
set_tests_properties(remoteServerCleanup PROPERTIES FIXTURES_CLEANUP Server)

#add remote tests below this line
add_remote_tests_helper(WriteReadADIOS2stdio)
add_remote_tests_helper(WriteMemorySelectionRead)
endif()

if(ADIOS2_HAVE_MPI)
list(APPEND BP5LargeMeta "Engine.BP.BPLargeMetadata.BPWrite1D_LargeMetadata.BP5.MPI" "Engine.BP.BPLargeMetadata.ManyLongStrings.BP5.MPI")
endif()
Expand Down

0 comments on commit f79044f

Please sign in to comment.