diff --git a/ompi/mca/mtl/ofi/mtl_ofi.h b/ompi/mca/mtl/ofi/mtl_ofi.h index 0f999eeaba3..9f5ca214be9 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi.h +++ b/ompi/mca/mtl/ofi/mtl_ofi.h @@ -61,8 +61,7 @@ __opal_attribute_always_inline__ static inline int ompi_mtl_ofi_progress(void) { ssize_t ret; - int count = 0; - struct fi_cq_tagged_entry wc = { 0 }; + int count = 0, i, events_read; struct fi_cq_err_entry error = { 0 }; ompi_mtl_ofi_request_t *ofi_req = NULL; @@ -72,19 +71,23 @@ ompi_mtl_ofi_progress(void) * Call the request's callback. */ while (true) { - ret = fi_cq_read(ompi_mtl_ofi.cq, (void *)&wc, 1); + ret = fi_cq_read(ompi_mtl_ofi.cq, ompi_mtl_ofi.progress_entries, + ompi_mtl_ofi.ofi_progress_event_count); if (ret > 0) { - count++; - if (NULL != wc.op_context) { - ofi_req = TO_OFI_REQ(wc.op_context); - assert(ofi_req); - ret = ofi_req->event_callback(&wc, ofi_req); - if (OMPI_SUCCESS != ret) { - opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n" - "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n", - __FILE__, __LINE__, ret); - fflush(stderr); - exit(1); + count+= ret; + events_read = ret; + for (i = 0; i < events_read; i++) { + if (NULL != ompi_mtl_ofi.progress_entries[i].op_context) { + ofi_req = TO_OFI_REQ(ompi_mtl_ofi.progress_entries[i].op_context); + assert(ofi_req); + ret = ofi_req->event_callback(&ompi_mtl_ofi.progress_entries[i], ofi_req); + if (OMPI_SUCCESS != ret) { + opal_output(0, "%s:%d: Error returned by request event callback: %zd.\n" + "*** The Open MPI OFI MTL is aborting the MPI job (via exit(3)).\n", + __FILE__, __LINE__, ret); + fflush(stderr); + exit(1); + } } } } else if (OPAL_UNLIKELY(ret == -FI_EAVAIL)) { diff --git a/ompi/mca/mtl/ofi/mtl_ofi_component.c b/ompi/mca/mtl/ofi/mtl_ofi_component.c index 0c70d5427e0..662fb38e796 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi_component.c +++ b/ompi/mca/mtl/ofi/mtl_ofi_component.c @@ -98,6 +98,7 @@ ompi_mtl_ofi_component_register(void) { int ret; mca_base_var_enum_t *new_enum = NULL; + char *desc; param_priority = 25; /* for now give a lower priority than the psm mtl */ mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version, @@ -125,6 +126,18 @@ ompi_mtl_ofi_component_register(void) MCA_BASE_VAR_SCOPE_READONLY, &prov_exclude); + ompi_mtl_ofi.ofi_progress_event_count = 100; + asprintf(&desc, "Max number of events to read each call to OFI progress (default: %d events will be read per OFI progress call)", ompi_mtl_ofi.ofi_progress_event_count); + mca_base_component_var_register(&mca_mtl_ofi_component.super.mtl_version, + "progress_event_cnt", + desc, + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_6, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_mtl_ofi.ofi_progress_event_count); + + free(desc); + ret = mca_base_var_enum_create ("control_prog_type", control_prog_type, &new_enum); if (OPAL_SUCCESS != ret) { return ret; @@ -465,6 +478,19 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads, * - dynamic memory-spanning memory region */ cq_attr.format = FI_CQ_FORMAT_TAGGED; + + /** + * If a user has set an ofi_progress_event_count > the default, then + * the CQ size hint is set to the user's desired value such that + * the CQ created will have enough slots to store up to + * ofi_progress_event_count events. If a user has not set the + * ofi_progress_event_count, then the provider is trusted to set a + * default high CQ size and the CQ size hint is left unspecified. + */ + if (ompi_mtl_ofi.ofi_progress_event_count > 100) { + cq_attr.size = ompi_mtl_ofi.ofi_progress_event_count; + } + ret = fi_cq_open(ompi_mtl_ofi.domain, &cq_attr, &ompi_mtl_ofi.cq, NULL); if (ret) { opal_output_verbose(1, ompi_mtl_base_framework.framework_output, @@ -473,6 +499,17 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads, goto error; } + /** + * Allocate memory for storing the CQ events read in OFI progress. + */ + ompi_mtl_ofi.progress_entries = calloc(ompi_mtl_ofi.ofi_progress_event_count, sizeof(struct fi_cq_tagged_entry)); + if (OPAL_UNLIKELY(!ompi_mtl_ofi.progress_entries)) { + opal_output_verbose(1, ompi_mtl_base_framework.framework_output, + "%s:%d: alloc of CQ event storage failed: %s\n", + __FILE__, __LINE__, strerror(errno)); + goto error; + } + /** * The remote fi_addr will be stored in the ofi_endpoint struct. */ @@ -595,6 +632,10 @@ ompi_mtl_ofi_component_init(bool enable_progress_threads, if (ompi_mtl_ofi.fabric) { (void) fi_close((fid_t)ompi_mtl_ofi.fabric); } + if (ompi_mtl_ofi.progress_entries) { + free(ompi_mtl_ofi.progress_entries); + } + return NULL; } @@ -626,6 +667,8 @@ ompi_mtl_ofi_finalize(struct mca_mtl_base_module_t *mtl) goto finalize_err; } + free(ompi_mtl_ofi.progress_entries); + return OMPI_SUCCESS; finalize_err: diff --git a/ompi/mca/mtl/ofi/mtl_ofi_types.h b/ompi/mca/mtl/ofi/mtl_ofi_types.h index 1b1bdb1e1c5..0b6a1fcc715 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi_types.h +++ b/ompi/mca/mtl/ofi/mtl_ofi_types.h @@ -49,6 +49,12 @@ typedef struct mca_mtl_ofi_module_t { /** Maximum inject size */ size_t max_inject_size; + /** Maximum number of CQ events to read in OFI Progress */ + int ofi_progress_event_count; + + /** CQ event storage */ + struct fi_cq_tagged_entry *progress_entries; + } mca_mtl_ofi_module_t; extern mca_mtl_ofi_module_t ompi_mtl_ofi;