diff --git a/ompi/mca/part/persist/part_persist.c b/ompi/mca/part/persist/part_persist.c index b90a180a601..01a21b3d693 100644 --- a/ompi/mca/part/persist/part_persist.c +++ b/ompi/mca/part/persist/part_persist.c @@ -45,3 +45,8 @@ OBJ_CLASS_INSTANCE(mca_part_persist_list_t, NULL, NULL); +OBJ_CLASS_INSTANCE(mca_part_persist_send_tag_list_t, + opal_list_item_t, + NULL, + NULL); + diff --git a/ompi/mca/part/persist/part_persist.h b/ompi/mca/part/persist/part_persist.h index febc1385376..9c0d8603b03 100644 --- a/ompi/mca/part/persist/part_persist.h +++ b/ompi/mca/part/persist/part_persist.h @@ -51,8 +51,14 @@ typedef struct mca_part_persist_list_t { mca_part_persist_request_t *item; } mca_part_persist_list_t; -OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_list_t); +typedef struct mca_part_persist_send_tag_list_t { + opal_list_item_t super; + int32_t peer; + int32_t tag; +} mca_part_persist_send_tag_list_t; +OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_list_t); +OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_send_tag_list_t); struct ompi_part_persist_t { mca_part_base_module_t super; @@ -60,8 +66,8 @@ struct ompi_part_persist_t { int free_list_max; int free_list_inc; opal_list_t *progress_list; + opal_list_t *peer_send_tag_list; /* This is a list of the next starting tags per peer. This reduces the chance of interger overflow. */ - int32_t next_send_tag; /**< This is a counter for send tags for the actual data transfer. */ int32_t next_recv_tag; ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */ ompi_request_t *part_comm_req; @@ -84,6 +90,30 @@ typedef struct ompi_part_persist_t ompi_part_persist_t; extern ompi_part_persist_t ompi_part_persist; +/** + * This is a helper function that gets the next send tag for a given peer, and initializes a new counter + * if the peer hasn't been used yet. + */ +__opal_attribute_always_inline__ static inline int mca_part_persist_next_sent_tag(int32_t peer, int32_t num_tags) +{ + mca_part_persist_send_tag_list_t *current; + + OPAL_LIST_FOREACH(current, ompi_part_persist.peer_send_tag_list, mca_part_persist_send_tag_list_t) { + if(current->peer == peer) { + int ret = current->tag; + current->tag += num_tags; + return ret; + } + } + + current = OBJ_NEW(mca_part_persist_send_tag_list_t); + current->peer = peer; + current->tag = num_tags; + opal_list_append(ompi_part_persist.peer_send_tag_list, (opal_list_item_t*)current); + return 0; +} + + /** * This is a helper function that frees a request. This requires ompi_part_persist.lock be held before calling. */ @@ -131,6 +161,7 @@ __opal_attribute_always_inline__ static inline void mca_part_persist_init_lists( ompi_part_persist.free_list_inc, NULL, 0, NULL, NULL, NULL); ompi_part_persist.progress_list = OBJ_NEW(opal_list_t); + ompi_part_persist.peer_send_tag_list = OBJ_NEW(opal_list_t); } __opal_attribute_always_inline__ static inline void @@ -434,7 +465,7 @@ mca_part_persist_psend_init(const void* buf, /* non-blocking send set-up data */ req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm); - req->setup_info[0].start_tag = ompi_part_persist.next_send_tag; ompi_part_persist.next_send_tag += parts; + req->setup_info[0].start_tag = mca_part_persist_next_sent_tag(dst, parts); req->my_send_tag = req->setup_info[0].start_tag; req->setup_info[0].setup_tag = ompi_part_persist.next_recv_tag; ompi_part_persist.next_recv_tag++; req->my_recv_tag = req->setup_info[0].setup_tag; diff --git a/ompi/mca/part/persist/part_persist_component.c b/ompi/mca/part/persist/part_persist_component.c index 919284476b9..be309a89c53 100644 --- a/ompi/mca/part/persist/part_persist_component.c +++ b/ompi/mca/part/persist/part_persist_component.c @@ -93,7 +93,6 @@ mca_part_persist_component_open(void) { OBJ_CONSTRUCT(&ompi_part_persist.lock, opal_mutex_t); - ompi_part_persist.next_send_tag = 0; /**< This is a counter for send tags for the actual data transfer. */ ompi_part_persist.next_recv_tag = 0; mca_part_persist_init_lists();