diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index efecb57624..d66e096bda 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -450,6 +450,14 @@ if(ADIOS2_USE_SST AND NOT WIN32) if(CrayDRC_FOUND) set(ADIOS2_SST_HAVE_CRAY_DRC TRUE) endif() + + try_compile(ADIOS2_SST_HAVE_CRAY_CXI + ${ADIOS2_BINARY_DIR}/check_libfabric_cxi + ${ADIOS2_SOURCE_DIR}/cmake/check_libfabric_cxi.c + CMAKE_FLAGS + "-DINCLUDE_DIRECTORIES=${LIBFABRIC_INCLUDE_DIRS}" + "-DLINK_DIRECTORIES=${LIBFABRIC_LIBRARIES}") + message(STATUS "Libfabric support for the HPE CXI provider: ${ADIOS2_SST_HAVE_CRAY_CXI}") endif() if(ADIOS2_HAVE_MPI) set(CMAKE_REQUIRED_LIBRARIES "MPI::MPI_C;Threads::Threads") diff --git a/cmake/check_libfabric_cxi.c b/cmake/check_libfabric_cxi.c new file mode 100644 index 0000000000..c2f2c80878 --- /dev/null +++ b/cmake/check_libfabric_cxi.c @@ -0,0 +1,5 @@ +#include +#include +#include + +int main() {} diff --git a/source/adios2/toolkit/sst/CMakeLists.txt b/source/adios2/toolkit/sst/CMakeLists.txt index 070c8f41a1..c77fc0459b 100644 --- a/source/adios2/toolkit/sst/CMakeLists.txt +++ b/source/adios2/toolkit/sst/CMakeLists.txt @@ -65,6 +65,7 @@ set(SST_CONFIG_OPTS UCX FI_GNI CRAY_DRC + CRAY_CXI NVStream MPI ) diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index e61c6f5f3f..e3800f4ff0 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -19,6 +19,13 @@ #include #include +#ifdef SST_HAVE_CRAY_CXI +#include +// This comment prevents clang-format from reordering these includes. +// The CXI extension header requires the bool header, but does not include it on its own. +#include +#endif + #if defined(__has_feature) #if __has_feature(thread_sanitizer) #define NO_SANITIZE_THREAD __attribute__((no_sanitize("thread"))) @@ -53,6 +60,66 @@ static pthread_mutex_t fabric_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t wsr_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t ts_mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Wrapper for fi_mr_reg() with additional parameters endpoint and mr_mode. + * If mr_mode includes FI_MR_ENDPOINT, the memory region must be bound to the + * endpoint and enabled before use. + */ +int sst_fi_mr_reg( + /* first two parameters for verbose logging */ + CP_Services Svcs, void *CP_Stream, + /* regular fi_mir_reg() parameters*/ + struct fid_domain *domain, const void *buf, size_t len, uint64_t acs, uint64_t offset, + uint64_t requested_key, uint64_t flags, struct fid_mr **mr, void *context, + /* additional parameters for binding the mr to the endpoint*/ + struct fid_ep *endpoint, int mr_mode) +{ + int res = fi_mr_reg(domain, buf, len, acs, offset, requested_key, flags, mr, context); + int is_mr_endpoint = (mr_mode & FI_MR_ENDPOINT) != 0; + if (!is_mr_endpoint) + { + return res; + } + if (res != FI_SUCCESS) + { + Svcs->verbose(CP_Stream, DPCriticalVerbose, "fi_mr_reg failed with %ul (%s)\n", res, + fi_strerror(res)); + return res; + } + + /* + * When the domain_attr->mr_mode includes FI_MR_ENDPOINT, the memory region + * needs to be bound to the endpoint and explicitly enabled after that. + */ + res = fi_mr_bind(*mr, &endpoint->fid, 0); + if (res != FI_SUCCESS) + { + Svcs->verbose(CP_Stream, DPCriticalVerbose, "fi_mr_bind failed with %ul (%s)\n", res, + fi_strerror(res)); + return res; + } + res = fi_mr_enable(*mr); + if (res != FI_SUCCESS) + { + Svcs->verbose(CP_Stream, DPCriticalVerbose, "fi_mr_enable failed with %ul (%s)\n", res, + fi_strerror(res)); + return res; + } + return res; +} + +/* + * Simple wrapper to create a log entry upon failing fi_*() function calls. + */ +int guard_fi_return(int code, CP_Services Svcs, CManager cm, char const *msg) +{ + if (code != FI_SUCCESS) + { + Svcs->verbose(cm, DPCriticalVerbose, "%s: %s (%lu)\n", msg, fi_strerror(code), code); + } + return code; +} + struct fabric_state { struct fi_context *ctx; @@ -60,6 +127,7 @@ struct fabric_state struct fi_info *info; // struct fi_info *linfo; int local_mr_req; + int mr_virt_addr; /* Stores if the mr_mode includes FI_MR_VIRT_ADDR */ int rx_cq_data; size_t addr_len; size_t msg_prefix_size; @@ -69,6 +137,9 @@ struct fabric_state struct fid_cq *cq_signal; struct fid_av *av; pthread_t listener; +#ifdef SST_HAVE_CRAY_CXI + struct cxi_auth_key *cxi_auth_key; +#endif #ifdef SST_HAVE_CRAY_DRC drc_info_handle_t drc_info; uint32_t credential; @@ -112,13 +183,24 @@ struct fabric_state * plane would replace one or both of these with RDMA functionality. */ +static char const *get_preferred_domain(struct _SstParams *Params) +{ + if (Params->DataInterface) + { + return Params->DataInterface; + } + else + { + return getenv("FABRIC_IFACE"); + } +} + static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, CP_Services Svcs, - void *CP_Stream) + void *CP_Stream, char const *ifname) { struct fi_info *hints, *info, *originfo, *useinfo; struct fi_av_attr av_attr = {FI_AV_UNSPEC}; struct fi_cq_attr cq_attr = {0}; - char *ifname; int result; hints = fi_allocinfo(); @@ -126,24 +208,76 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, FI_MSG | FI_SEND | FI_RECV | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA | FI_READ | FI_WRITE; hints->mode = FI_CONTEXT | FI_LOCAL_MR | FI_CONTEXT2 | FI_MSG_PREFIX | FI_ASYNC_IOV | FI_RX_CQ_DATA; - hints->domain_attr->mr_mode = FI_MR_BASIC; - hints->domain_attr->control_progress = FI_PROGRESS_AUTO; - hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->ep_attr->type = FI_EP_RDM; - if (Params->DataInterface) + uint32_t fi_version; +#ifdef SST_HAVE_CRAY_CXI + if (fabric->cxi_auth_key) { - ifname = Params->DataInterface; + fi_version = FI_VERSION(1, 11); + + hints->domain_attr->mr_mode = FI_MR_ENDPOINT; + hints->domain_attr->control_progress = FI_PROGRESS_MANUAL; + hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; + + // Authentication is needed + // TODO: the first ID in SLINGSHOT_SVC_IDS is chosen, but we should + // rather choose the one corresponding with the FABRIC_IFACE + // example: + // SLINGSHOT_SVC_IDS=5,5,5,5 + // SLINGSHOT_VNIS=1310,1271 + // SLINGSHOT_DEVICES=cxi0,cxi1,cxi2,cxi3 + // FABRIC_IFACE=cxi2 (user specified) + + hints->ep_attr->auth_key = malloc(sizeof(struct cxi_auth_key)); + memcpy(hints->ep_attr->auth_key, fabric->cxi_auth_key, sizeof(struct cxi_auth_key)); + hints->ep_attr->auth_key_size = sizeof(struct cxi_auth_key); + + hints->domain_attr->auth_key = malloc(sizeof(struct cxi_auth_key)); + memcpy(hints->domain_attr->auth_key, fabric->cxi_auth_key, sizeof(struct cxi_auth_key)); + hints->domain_attr->auth_key_size = sizeof(struct cxi_auth_key); } else { - ifname = getenv("FABRIC_IFACE"); + fi_version = FI_VERSION(1, 5); + + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; + } +#else + fi_version = FI_VERSION(1, 5); + + // Alternatively, one could set mr_mode to + // FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_LOCAL + // here. These flags are equivalent to FI_MR_BASIC, but unlike basic + // registration, providers are not forced to keep those flags when they + // think that not using the flags is better. + // The RDMA DP is able to deal with this appropriately, and does so right + // before calling fi_fabric() further below in this function. + // The main reason for keeping FI_MR_BASIC here is backward compatibility. + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; +#endif + + /* + * ifname is passed as a function parameter of init_fabric() if + * a provider-specific key was configured and sent to the reader. + * Since the key is generally domain-specific, we must use that one in this + * case. + * The preferred domain is already considered upon key configuration, + * so this is fine. + */ + if (!ifname) + { + ifname = get_preferred_domain(Params); } fabric->info = NULL; pthread_mutex_lock(&fabric_mutex); - fi_getinfo(FI_VERSION(1, 5), NULL, NULL, 0, hints, &info); + fi_getinfo(fi_version, NULL, NULL, 0, hints, &info); pthread_mutex_unlock(&fabric_mutex); if (!info) { @@ -167,7 +301,8 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, break; } if ((((strcmp(prov_name, "verbs") == 0) && info->src_addr) || - (strcmp(prov_name, "gni") == 0) || (strcmp(prov_name, "psm2") == 0)) && + (strcmp(prov_name, "gni") == 0) || (strcmp(prov_name, "psm2") == 0) || + (strcmp(prov_name, "cxi") == 0)) && (!useinfo || !ifname || (strcmp(useinfo->domain_attr->name, ifname) != 0))) { Svcs->verbose(CP_Stream, DPTraceVerbose, @@ -177,7 +312,7 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, useinfo = info; } else if (((strstr(prov_name, "verbs") && info->src_addr) || strstr(prov_name, "gni") || - strstr(prov_name, "psm2")) && + strstr(prov_name, "psm2") || strstr(prov_name, "cxi")) && !useinfo) { Svcs->verbose(CP_Stream, DPTraceVerbose, @@ -253,7 +388,30 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, fabric->addr_len = info->src_addrlen; - info->domain_attr->mr_mode = FI_MR_BASIC; + /* + * The libfabric data-plane of SST was originally programmed to use + * FI_MR_BASIC as mr_mode, which is equivalent to + * FI_MR_VIRT_ADDR | FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_LOCAL. + * + * However, HPE's CXI provider requires two changes to that: + * (1) It does not support FI_MR_VIRT_ADDR. + * (2) It requires use of FI_MR_ENDPOINT. + * + * So we propagate the bit value currently contained in the mr_mode + * for these flags. + */ + if (info->domain_attr->mr_mode != FI_MR_BASIC) + { + info->domain_attr->mr_mode = FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_LOCAL | + (FI_MR_ENDPOINT & info->domain_attr->mr_mode) | + (FI_MR_VIRT_ADDR & info->domain_attr->mr_mode); + fabric->mr_virt_addr = info->domain_attr->mr_mode & FI_MR_VIRT_ADDR ? 1 : 0; + } + else + { + fabric->mr_virt_addr = 1; + } + #ifdef SST_HAVE_CRAY_DRC if (strstr(info->fabric_attr->prov_name, "gni") && fabric->auth_key) { @@ -411,6 +569,12 @@ static void fini_fabric(struct fabric_state *fabric, CP_Services Svcs, void *CP_ free(fabric->ctx); } +#ifdef SST_HAVE_CRAY_CXI + if (fabric->cxi_auth_key) + { + free(fabric->cxi_auth_key); + } +#endif #ifdef SST_HAVE_CRAY_DRC if (Fabric->auth_key) { @@ -599,6 +763,191 @@ static TimestepList GetStep(Rdma_WS_Stream Stream, long Timestep) return (Step); } +#ifdef SST_HAVE_CRAY_CXI +static int get_cxi_auth_key_from_env(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, + struct cxi_auth_key *key, char **used_device) +{ + int vni, first_vni, second_vni, svc_id; + + // Just some safety against faulty strings in string processing. + size_t const no_infinite_loops = 10000; + + // struct cxi_auth_key { + // /* The CXI service assigned to the Domain and Endpoints. A CXI + // service + // * is associated with a set of local resource limits, VNIs, and + // Traffic + // * Classes. + // * + // * The svc_id used by an OFI Domain must match all Endpoints belonging + // * to the Domain. + // */ + // uint32_t svc_id; + + // /* The Virtual Network ID (VNI) assigned to the Endpoint. Two + // Endpoints + // * must use the same VNI in order to communicate. + // * + // * Note that while the CXI service may define one or more VNIs which a + // * process can access, an Endpoint is assigned to only one. + // */ + // uint16_t vni; + // }; + + // typical value SLINGSHOT_DEVICES=cxi0,cxi1,cxi2,cxi3 + char const *slingshot_devices = getenv("SLINGSHOT_DEVICES"); + char const *preferred_device = get_preferred_domain(Params); + + /* + * In the following loop, find out if the preferred_device is found within + * the slingshot_devices. + * If the preferred_device is NULL, just pick the first. + * Upon success, modifies the output parameter used_device and stores + * the retrieved device index. + */ + size_t device_index = 0; + for (size_t no_infinite_loop_counter = 0;; ++device_index, ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + // Are we at the end of the environment variable? + int found_end = 0; + + // Find out the length of the current item in slingshot_devices. + size_t find_end_of_current_string = 0; + for (size_t no_infinite_loop_inner_counter = 0;; + ++find_end_of_current_string, ++no_infinite_loop_inner_counter) + { + if (no_infinite_loop_inner_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (slingshot_devices[find_end_of_current_string]) + { + case '\0': + found_end = 1; + goto break_first_loop; + case ',': + goto break_first_loop; + default: + break; + } + } + break_first_loop:; + int use_this_device = !preferred_device || (strncmp(preferred_device, slingshot_devices, + find_end_of_current_string) == 0); + if (use_this_device) + { + char *construct_used_device = malloc(find_end_of_current_string + 1); + memcpy(construct_used_device, slingshot_devices, find_end_of_current_string); + construct_used_device[find_end_of_current_string] = '\0'; + *used_device = construct_used_device; + break; + } + else if (found_end) + { + return EXIT_FAILURE; + } + else + { + // go to next iteration + slingshot_devices += find_end_of_current_string + 1; + } + } + + Svcs->verbose(CP_Stream, DPTraceVerbose, "Found device %s at index %zu\n", *used_device, + device_index); + + // typical value SLINGSHOT_VNIS=4576,4530 + char const *vni_env_str = getenv("SLINGSHOT_VNIS"); + if (!vni_env_str) + { + return EXIT_FAILURE; + } + + // typical value SLINGSHOT_SVC_IDS=5,5,5,5 + char const *svc_ids_env_str = getenv("SLINGSHOT_SVC_IDS"); + if (!svc_ids_env_str) + { + return EXIT_FAILURE; + } + + { + int num_vnis = sscanf(vni_env_str, "%d,%d", &first_vni, &second_vni); + switch (num_vnis) + { + // first VNI is the subjob's VNI + case 1: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using first vni.\n"); + vni = first_vni; + break; + // if present, the second VNI is the containing job's VNI + // the first VNI belongs to the subjob + case 2: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using second vni.\n"); + vni = second_vni; + break; + default: + return EXIT_FAILURE; + } + } + + { + // Pick the service ID according to the device_index found above. + for (size_t svc_id_index = 0; svc_id_index < device_index; ++svc_id_index) + { + for (size_t no_infinite_loop_counter = 0;; ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (*(svc_ids_env_str++)) + { + case ',': + goto break_second_loop; + case '\0': + return EXIT_FAILURE; + default: + continue; + } + } + break_second_loop:; + } + + int num_svc_ids = sscanf(svc_ids_env_str, "%d", &svc_id); + switch (num_svc_ids) + { + case 1: + break; + default: + return EXIT_FAILURE; + } + } + + key->vni = vni; + key->svc_id = svc_id; + + return EXIT_SUCCESS; +} + +static int get_cxi_auth_key_from_writer(struct cxi_auth_key *key, attr_list WriterContact) +{ + long vni; + if (!get_long_attr(WriterContact, attr_atom_from_string("vni"), &vni)) + { + return EXIT_FAILURE; + } + key->vni = (uint16_t)vni; + return EXIT_SUCCESS; +} +#endif + static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, attr_list WriterContact, SstStats Stats) @@ -645,6 +994,38 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **Rea Stream->PreloadAvail = 0; } + char *required_device = NULL; +#ifdef SST_HAVE_CRAY_CXI + struct + { + struct cxi_auth_key key; + int valid; + } tagged_key; + + /* + * The svc_id of the key must match the device that this particular reader + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (get this from the writer). + */ + + tagged_key.valid = + get_cxi_auth_key_from_env(Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + if (tagged_key.valid == EXIT_SUCCESS && + get_cxi_auth_key_from_writer(&tagged_key.key, WriterContact) == EXIT_SUCCESS) + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Reader found CXI auth key: %d %d\n", + tagged_key.key.vni, tagged_key.key.svc_id); + Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); + memcpy(Stream->Fabric->cxi_auth_key, &tagged_key.key, sizeof(struct cxi_auth_key)); + } + else + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Reader found no CXI auth key\n"); + } +#endif + #ifdef SST_HAVE_CRAY_DRC int attr_cred, try_left, rc; if (!get_int_attr(WriterContact, attr_atom_from_string("RDMA_DRC_KEY"), &attr_cred)) @@ -675,7 +1056,11 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **Rea #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } if (!Fabric->info) { Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not find a valid transport fabric.\n"); @@ -684,7 +1069,12 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **Rea ContactInfo->Length = Fabric->info->src_addrlen; ContactInfo->Address = malloc(ContactInfo->Length); - fi_getname((fid_t)Fabric->signal, ContactInfo->Address, &ContactInfo->Length); + if (guard_fi_return( + fi_getname((fid_t)Fabric->signal, ContactInfo->Address, &ContactInfo->Length), Svcs, + CP_Stream, "[RdmaInitReader] fi_getname() failed with:") != FI_SUCCESS) + { + return NULL; + } Stream->PreloadStep = -1; Stream->ContactInfo = ContactInfo; @@ -775,6 +1165,42 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, struct _Ss Stream->Fabric = calloc(1, sizeof(struct fabric_state)); Fabric = Stream->Fabric; + + char *required_device = NULL; +#ifdef SST_HAVE_CRAY_CXI + struct + { + struct cxi_auth_key key; + int valid; + } tagged_key; + + /* + * The svc_id of the key must match the device that this particular writer + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (use the one seen by rank 0). + */ + tagged_key.valid = + get_cxi_auth_key_from_env(Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + // Ensure that all writers use the same virtual network ID + SMPI_Bcast(&tagged_key.key.vni, sizeof(tagged_key.key.vni), SMPI_BYTE, 0, comm); + + if (tagged_key.valid == EXIT_SUCCESS) + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Writer found CXI auth key: %d %d\n", + tagged_key.key.vni, tagged_key.key.svc_id); + + set_long_attr(DPAttrs, attr_atom_from_string("vni"), tagged_key.key.vni); + Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); + memcpy(Stream->Fabric->cxi_auth_key, &tagged_key.key, sizeof(struct cxi_auth_key)); + } + else + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Writer found no CXI auth key"); + } +#endif + #ifdef SST_HAVE_CRAY_DRC int try_left, rc; if (Stream->Rank == 0) @@ -818,7 +1244,11 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, struct _Ss set_long_attr(DPAttrs, attr_atom_from_string("RDMA_DRC_CRED"), attr_cred); #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } Fabric = Stream->Fabric; if (!Fabric->info) { @@ -872,8 +1302,15 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_S for (i = 0; i < readerCohortSize; i++) { - fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1, &WSR_Stream->ReaderAddr[i], 0, - NULL); + if (fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1, &WSR_Stream->ReaderAddr[i], + 0, NULL) < 1) + { + + Svcs->verbose(WS_Stream->CP_Stream, DPCriticalVerbose, + "[RdmaInitWRiterPerReader] Failed inserting address " + "into vector\n"); + return NULL; + } Svcs->verbose(WS_Stream->CP_Stream, DPTraceVerbose, "Received contact info for RS_Stream %p, WSR Rank %d\n", providedReaderInfo[i]->RS_Stream, i); @@ -895,13 +1332,20 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_S ContactInfo->Length = Fabric->info->src_addrlen; ContactInfo->Address = malloc(ContactInfo->Length); - fi_getname((fid_t)Fabric->signal, ContactInfo->Address, &ContactInfo->Length); + if (guard_fi_return( + fi_getname((fid_t)Fabric->signal, ContactInfo->Address, &ContactInfo->Length), Svcs, + WS_Stream->CP_Stream, + "[RdmaInitWriterPerReader] fi_getname() failed with") != FI_SUCCESS) + { + return NULL; + } ReaderRollHandle = &ContactInfo->ReaderRollHandle; ReaderRollHandle->Block = calloc(readerCohortSize, sizeof(struct _RdmaBuffer)); - fi_mr_reg(Fabric->domain, ReaderRollHandle->Block, - readerCohortSize * sizeof(struct _RdmaBuffer), FI_REMOTE_WRITE, 0, 0, 0, - &WSR_Stream->rrmr, Fabric->ctx); + sst_fi_mr_reg(Svcs, WS_Stream->CP_Stream, Fabric->domain, ReaderRollHandle->Block, + readerCohortSize * sizeof(struct _RdmaBuffer), FI_REMOTE_WRITE, 0, 0, 0, + &WSR_Stream->rrmr, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); ReaderRollHandle->Key = fi_mr_key(WSR_Stream->rrmr); WSR_Stream->WriterContactInfo = ContactInfo; @@ -943,8 +1387,15 @@ static void RdmaProvideWriterDataToReader(CP_Services Svcs, DP_RS_Stream RS_Stre for (int i = 0; i < writerCohortSize; i++) { RS_Stream->WriterContactInfo[i].WS_Stream = providedWriterInfo[i]->WS_Stream; - fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1, &RS_Stream->WriterAddr[i], 0, - NULL); + if (fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1, &RS_Stream->WriterAddr[i], + 0, NULL) < 1) + { + Svcs->verbose(RS_Stream->CP_Stream, DPCriticalVerbose, + "[RdmaProvideWriterDataToReader] " + "Failed inserting address " + "into vector\n"); + return; + } RS_Stream->WriterRoll[i] = providedWriterInfo[i]->ReaderRollHandle; Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, "Received contact info for WS_stream %p, WSR Rank %d\n", @@ -1032,11 +1483,21 @@ static ssize_t PostRead(CP_Services Svcs, Rdma_RS_Stream RS_Stream, int Rank, lo if (Fabric->local_mr_req) { // register dest buffer - fi_mr_reg(Fabric->domain, Buffer, Length, FI_READ, 0, 0, 0, &ret->LocalMR, Fabric->ctx); + sst_fi_mr_reg(Svcs, RS_Stream->CP_Stream, Fabric->domain, Buffer, Length, FI_READ, 0, 0, 0, + &ret->LocalMR, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); LocalDesc = fi_mr_desc(ret->LocalMR); } - Addr = Info->Block + Offset; + if (Fabric->mr_virt_addr) + { + Addr = Info->Block + Offset; + } + else + { + Addr = NULL; + Addr += Offset; + } Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, "Remote read target is Rank %d (Offset = %zi, Length = %zi)\n", Rank, Offset, @@ -1203,8 +1664,14 @@ static int DoPushWait(CP_Services Svcs, Rdma_RS_Stream Stream, RdmaCompletionHan rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); if (rc < 1) { - Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose, - "failure while waiting for completions (%d).\n", rc); + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + Stream->CP_Stream, DPCriticalVerbose, + "failure while waiting for completions inside " + "DoPushWait() (%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, NULL, error.len)); return 0; } else if (CQEntry.flags & FI_REMOTE_CQ_DATA) @@ -1276,8 +1743,14 @@ static int WaitForAnyPull(CP_Services Svcs, Rdma_RS_Stream Stream) rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); if (rc < 1) { - Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose, - "failure while waiting for completions (%d).\n", rc); + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + Stream->CP_Stream, DPCriticalVerbose, + "failure while waiting for completions inside " + "WaitForAnyPull() (%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, NULL, error.len)); return 0; } else @@ -1345,8 +1818,9 @@ static void RdmaProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, struct Entry->BufferSlot = -1; Entry->Desc = NULL; - fi_mr_reg(Fabric->domain, Data->block, Data->DataSize, FI_WRITE | FI_REMOTE_READ, 0, 0, 0, - &Entry->mr, Fabric->ctx); + sst_fi_mr_reg(Svcs, Stream->CP_Stream, Fabric->domain, Data->block, Data->DataSize, + FI_WRITE | FI_REMOTE_READ, 0, 0, 0, &Entry->mr, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); Entry->Key = fi_mr_key(Entry->mr); if (Fabric->local_mr_req) { @@ -1608,7 +2082,7 @@ static struct _CP_DP_Interface RdmaDPInterface = {0}; static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { struct fi_info *hints, *info, *originfo; - char *ifname; + char const *ifname; char *forkunsafe; int Ret = -1; @@ -1617,20 +2091,41 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams FI_MSG | FI_SEND | FI_RECV | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA | FI_READ | FI_WRITE; hints->mode = FI_CONTEXT | FI_LOCAL_MR | FI_CONTEXT2 | FI_MSG_PREFIX | FI_ASYNC_IOV | FI_RX_CQ_DATA; - hints->domain_attr->mr_mode = FI_MR_BASIC; - hints->domain_attr->control_progress = FI_PROGRESS_AUTO; - hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->ep_attr->type = FI_EP_RDM; - if (Params->DataInterface) + char const *vni_env_str = getenv("SLINGSHOT_VNIS"); + + uint32_t fi_version; + if (vni_env_str) { - ifname = Params->DataInterface; + // try fishing for the CXI provider + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "RDMA Dataplane trying to check for an available CXI " + "provider since environment variable SLINGSHOT_VNIS is " + "defined (value: '%s').\n", + vni_env_str); + fi_version = FI_VERSION(1, 11); + + hints->domain_attr->mr_mode = FI_MR_ENDPOINT; + hints->domain_attr->control_progress = FI_PROGRESS_MANUAL; + hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; } else { - ifname = getenv("FABRIC_IFACE"); + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "RDMA Dataplane trying to check for an available non-CXI " + "provider since environment variable SLINGSHOT_VNIS is " + "not defined.\n"); + + fi_version = FI_VERSION(1, 5); + + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; } + ifname = get_preferred_domain(Params); + forkunsafe = getenv("FI_FORK_UNSAFE"); if (!forkunsafe) { @@ -1638,7 +2133,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams } pthread_mutex_lock(&fabric_mutex); - fi_getinfo(FI_VERSION(1, 5), NULL, NULL, 0, hints, &info); + fi_getinfo(fi_version, NULL, NULL, 0, hints, &info); pthread_mutex_unlock(&fabric_mutex); fi_freeinfo(hints); @@ -1656,6 +2151,10 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams prov_name = info->fabric_attr->prov_name; domain_name = info->domain_attr->name; + Svcs->verbose(CP_Stream, DPPerStepVerbose, + "[RdmaGetPriority] Seeing and evaluating fabric with " + "provider: '%s', domain: '%s'\n", + prov_name, domain_name); if (ifname && strcmp(ifname, domain_name) == 0) { Svcs->verbose(CP_Stream, DPPerStepVerbose, @@ -1666,7 +2165,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams break; } if ((strstr(prov_name, "verbs") && info->src_addr) || strstr(prov_name, "gni") || - strstr(prov_name, "psm2")) + strstr(prov_name, "psm2") || strstr(prov_name, "cxi")) { Svcs->verbose(CP_Stream, DPPerStepVerbose, @@ -1733,7 +2232,12 @@ static void PushData(CP_Services Svcs, Rdma_WSR_Stream Stream, TimestepList Step { rc = fi_writedata(Fabric->signal, StepBuffer + Req->Offset, Req->BufferLen, Step->Desc, Data, Stream->ReaderAddr[RankReq->Rank], - (uint64_t)Req->Handle.Block + + /* + * If mr_virt_addr is zero, we need just the offset, + * otherwise we need the remote virtual address composed by + * base pointer + offset. + */ + Fabric->mr_virt_addr * (uint64_t)Req->Handle.Block + (BufferSlot * RankReq->PreloadBufferSize), RollBuffer->Offset, (void *)(Step->Timestep)); } while (rc == -EAGAIN); @@ -1822,15 +2326,17 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) PreloadBuffer->BufferLen = 2 * StepLog->BufferSize; PreloadBuffer->Handle.Block = malloc(PreloadBuffer->BufferLen); - fi_mr_reg(Fabric->domain, PreloadBuffer->Handle.Block, PreloadBuffer->BufferLen, - FI_REMOTE_WRITE, 0, 0, 0, &Stream->pbmr, Fabric->ctx); + sst_fi_mr_reg(Svcs, Stream->CP_Stream, Fabric->domain, PreloadBuffer->Handle.Block, + PreloadBuffer->BufferLen, FI_REMOTE_WRITE, 0, 0, 0, &Stream->pbmr, Fabric->ctx, + Fabric->signal, Fabric->info->domain_attr->mr_mode); PreloadKey = fi_mr_key(Stream->pbmr); SBSize = sizeof(*SendBuffer) * StepLog->WRanks; SendBuffer = malloc(SBSize); if (Fabric->local_mr_req) { - fi_mr_reg(Fabric->domain, SendBuffer, SBSize, FI_WRITE, 0, 0, 0, &sbmr, Fabric->ctx); + sst_fi_mr_reg(Svcs, Stream->CP_Stream, Fabric->domain, SendBuffer, SBSize, FI_WRITE, 0, 0, + 0, &sbmr, Fabric->ctx, Fabric->signal, Fabric->info->domain_attr->mr_mode); sbdesc = fi_mr_desc(sbmr); } @@ -1838,8 +2344,9 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) { RBLen = 2 * StepLog->Entries * DP_DATA_RECV_SIZE; Stream->RecvDataBuffer = malloc(RBLen); - fi_mr_reg(Fabric->domain, Stream->RecvDataBuffer, RBLen, FI_RECV, 0, 0, 0, &Stream->rbmr, - Fabric->ctx); + sst_fi_mr_reg(Svcs, Stream->CP_Stream, Fabric->domain, Stream->RecvDataBuffer, RBLen, + FI_RECV, 0, 0, 0, &Stream->rbmr, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); Stream->rbdesc = fi_mr_desc(Stream->rbmr); RecvBuffer = (uint8_t *)Stream->RecvDataBuffer; for (i = 0; i < 2 * StepLog->Entries; i++) @@ -1862,9 +2369,10 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) if (RankLog->Entries > 0) { RankLog->Buffer = (void *)RawPLBuffer; - fi_mr_reg(Fabric->domain, RankLog->ReqLog, - (sizeof(struct _RdmaBuffer) * RankLog->Entries) + sizeof(uint64_t), - FI_REMOTE_READ, 0, 0, 0, &RankLog->preqbmr, Fabric->ctx); + sst_fi_mr_reg(Svcs, Stream->CP_Stream, Fabric->domain, RankLog->ReqLog, + (sizeof(struct _RdmaBuffer) * RankLog->Entries) + sizeof(uint64_t), + FI_REMOTE_READ, 0, 0, 0, &RankLog->preqbmr, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); for (j = 0; j < RankLog->Entries; j++) { ReqLog = &RankLog->ReqLog[j]; @@ -1883,11 +2391,17 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) SendBuffer[WRidx].Offset = (uint64_t)PreloadKey; SendBuffer[WRidx].Handle.Block = (void *)RankLog->ReqLog; SendBuffer[WRidx].Handle.Key = fi_mr_key(RankLog->preqbmr); - RollDest = - (uint64_t)Stream->WriterRoll[i].Block + (sizeof(struct _RdmaBuffer) * Stream->Rank); - fi_write(Fabric->signal, &SendBuffer[WRidx], sizeof(struct _RdmaBuffer), sbdesc, - Stream->WriterAddr[i], RollDest, Stream->WriterRoll[i].Key, - &SendBuffer[WRidx]); + /* + * If mr_virt_addr is zero, we need just the offset, + * otherwise we need the remote virtual address composed by + * base pointer + offset. + */ + RollDest = Fabric->mr_virt_addr * (uint64_t)Stream->WriterRoll[i].Block + + (sizeof(struct _RdmaBuffer) * Stream->Rank); + guard_fi_return((int)fi_write(Fabric->signal, &SendBuffer[WRidx], + sizeof(struct _RdmaBuffer), sbdesc, Stream->WriterAddr[i], + RollDest, Stream->WriterRoll[i].Key, &SendBuffer[WRidx]), + Svcs, Stream->CP_Stream, "[PostPreload] fi_write failed with:"); RankLog->PreloadHandles = malloc(sizeof(void *) * 2); RankLog->PreloadHandles[0] = calloc(sizeof(struct _RdmaCompletionHandle), RankLog->Entries); @@ -1899,7 +2413,19 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) while (WRidx > 0) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + Stream->CP_Stream, DPCriticalVerbose, + "[PostPreload] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, NULL, error.len)); + return; + } CQBuffer = CQEntry.op_context; if (CQBuffer >= SendBuffer && CQBuffer < (SendBuffer + StepLog->WRanks)) { @@ -2000,24 +2526,45 @@ static void PullSelection(CP_Services Svcs, Rdma_WSR_Stream Stream) ReqBuffer.Handle.Block = ReadBuffer = malloc(ReqBuffer.BufferLen); if (Fabric->local_mr_req) { - fi_mr_reg(Fabric->domain, ReqBuffer.Handle.Block, ReqBuffer.BufferLen, FI_READ, 0, 0, 0, - &rrmr, Fabric->ctx); + sst_fi_mr_reg(Svcs, WS_Stream->CP_Stream, Fabric->domain, ReqBuffer.Handle.Block, + ReqBuffer.BufferLen, FI_READ, 0, 0, 0, &rrmr, Fabric->ctx, Fabric->signal, + Fabric->info->domain_attr->mr_mode); rrdesc = fi_mr_desc(rrmr); } for (RankReq = Stream->PreloadReq; RankReq; RankReq = RankReq->next) { RankReq->ReqLog = (RdmaBuffer)ReadBuffer; - fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize, rrdesc, - Stream->ReaderAddr[RankReq->Rank], (uint64_t)ReaderRoll[RankReq->Rank].Handle.Block, - ReaderRoll[RankReq->Rank].Handle.Key, RankReq); + guard_fi_return( + (int)fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize, rrdesc, + Stream->ReaderAddr[RankReq->Rank], + /* + * If mr_virt_addr is 0, then this is a simple + * null-pointer, indicating no offset. Otherwise, we + * need the remote virtual memory read address. + */ + Fabric->mr_virt_addr * (uint64_t)ReaderRoll[RankReq->Rank].Handle.Block, + ReaderRoll[RankReq->Rank].Handle.Key, RankReq), + Svcs, WS_Stream->CP_Stream, "[PullSelection] fi_read() failed with:"); ReadBuffer += RankReq->BufferSize; } RankReq = Stream->PreloadReq; while (RankReq) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + WS_Stream->CP_Stream, DPCriticalVerbose, + "[PullSelection] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, NULL, error.len)); + return; + } CQRankReq = CQEntry.op_context; if (CQEntry.flags & FI_READ) { @@ -2049,7 +2596,19 @@ static void CompletePush(CP_Services Svcs, Rdma_WSR_Stream Stream, TimestepList while (Step->OutstandingWrites > 0) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + WS_Stream->CP_Stream, DPCriticalVerbose, + "[CompletePush] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, NULL, error.len)); + return; + } if (CQEntry.flags & FI_WRITE) { CQTimestep = (long)CQEntry.op_context;