Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ompi/mca/part/persist/part_persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,8 @@ OBJ_CLASS_INSTANCE(mca_part_persist_list_t,
NULL,
NULL);

OBJ_CLASS_INSTANCE(mca_part_persist_send_tag_list_t,
opal_list_item_t,
NULL,
NULL);

37 changes: 34 additions & 3 deletions ompi/mca/part/persist/part_persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,23 @@ typedef struct mca_part_persist_list_t {
mca_part_persist_request_t *item;
} mca_part_persist_list_t;

OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_list_t);
typedef struct mca_part_persist_send_tag_list_t {
opal_list_item_t super;
int32_t peer;
int32_t tag;
} mca_part_persist_send_tag_list_t;

OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_list_t);
OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_persist_send_tag_list_t);

struct ompi_part_persist_t {
mca_part_base_module_t super;
int free_list_num;
int free_list_max;
int free_list_inc;
opal_list_t *progress_list;
opal_list_t *peer_send_tag_list; /* This is a list of the next starting tags per peer. This reduces the chance of interger overflow. */

int32_t next_send_tag; /**< This is a counter for send tags for the actual data transfer. */
int32_t next_recv_tag;
ompi_communicator_t *part_comm; /* This approach requires a separate tag space, so we need a dedicated communicator. */
ompi_request_t *part_comm_req;
Expand All @@ -84,6 +90,30 @@ typedef struct ompi_part_persist_t ompi_part_persist_t;
extern ompi_part_persist_t ompi_part_persist;


/**
* This is a helper function that gets the next send tag for a given peer, and initializes a new counter
* if the peer hasn't been used yet.
*/
__opal_attribute_always_inline__ static inline int mca_part_persist_next_sent_tag(int32_t peer, int32_t num_tags)
{
mca_part_persist_send_tag_list_t *current;

OPAL_LIST_FOREACH(current, ompi_part_persist.peer_send_tag_list, mca_part_persist_send_tag_list_t) {
if(current->peer == peer) {
int ret = current->tag;
current->tag += num_tags;
return ret;
}
}

current = OBJ_NEW(mca_part_persist_send_tag_list_t);
current->peer = peer;
current->tag = num_tags;
opal_list_append(ompi_part_persist.peer_send_tag_list, (opal_list_item_t*)current);
return 0;
}


/**
* This is a helper function that frees a request. This requires ompi_part_persist.lock be held before calling.
*/
Expand Down Expand Up @@ -131,6 +161,7 @@ __opal_attribute_always_inline__ static inline void mca_part_persist_init_lists(
ompi_part_persist.free_list_inc,
NULL, 0, NULL, NULL, NULL);
ompi_part_persist.progress_list = OBJ_NEW(opal_list_t);
ompi_part_persist.peer_send_tag_list = OBJ_NEW(opal_list_t);
}

__opal_attribute_always_inline__ static inline void
Expand Down Expand Up @@ -434,7 +465,7 @@ mca_part_persist_psend_init(const void* buf,

/* non-blocking send set-up data */
req->setup_info[0].world_rank = ompi_comm_rank(&ompi_mpi_comm_world.comm);
req->setup_info[0].start_tag = ompi_part_persist.next_send_tag; ompi_part_persist.next_send_tag += parts;
req->setup_info[0].start_tag = mca_part_persist_next_sent_tag(dst, parts);
req->my_send_tag = req->setup_info[0].start_tag;
req->setup_info[0].setup_tag = ompi_part_persist.next_recv_tag; ompi_part_persist.next_recv_tag++;
req->my_recv_tag = req->setup_info[0].setup_tag;
Expand Down
1 change: 0 additions & 1 deletion ompi/mca/part/persist/part_persist_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ mca_part_persist_component_open(void)
{
OBJ_CONSTRUCT(&ompi_part_persist.lock, opal_mutex_t);

ompi_part_persist.next_send_tag = 0; /**< This is a counter for send tags for the actual data transfer. */
ompi_part_persist.next_recv_tag = 0;

mca_part_persist_init_lists();
Expand Down