From cc449df046d8b5dcab8b0641ef3128a0895b9625 Mon Sep 17 00:00:00 2001 From: Austen Lauria Date: Wed, 24 Mar 2021 17:51:23 -0400 Subject: [PATCH] btl/tcp: MPI_Put/Accumulate(): Fix pre-mature completeion callback. - Send an ack back to the origin letting it know when it is safe to invoke the completion callback. Otherwise it can be called before the data has arrived. Found with test accfence1.c. Signed-off-by: Austen Lauria --- ompi/mca/osc/rdma/osc_rdma_accumulate.c | 2 +- ompi/mca/osc/rdma/osc_rdma_comm.c | 2 +- opal/mca/btl/tcp/btl_tcp.c | 41 +++++++++++++++++++++++-- opal/mca/btl/tcp/btl_tcp.h | 5 +++ opal/mca/btl/tcp/btl_tcp_component.c | 9 ++++++ opal/mca/btl/tcp/btl_tcp_endpoint.c | 32 +++++++++++++++---- opal/mca/btl/tcp/btl_tcp_frag.c | 1 + opal/mca/btl/tcp/btl_tcp_hdr.h | 12 +++++--- 8 files changed, 90 insertions(+), 14 deletions(-) diff --git a/ompi/mca/osc/rdma/osc_rdma_accumulate.c b/ompi/mca/osc/rdma/osc_rdma_accumulate.c index 15f0a80714e..5d864518246 100644 --- a/ompi/mca/osc/rdma/osc_rdma_accumulate.c +++ b/ompi/mca/osc/rdma/osc_rdma_accumulate.c @@ -758,7 +758,7 @@ static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr, do { ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address, - local_handle, target_handle, len, 0, MCA_BTL_NO_ORDER, + local_handle, target_handle, len, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER, ompi_osc_rdma_cas_put_complete, (void *) &complete, NULL); if (OPAL_SUCCESS == ret || (OPAL_ERR_OUT_OF_RESOURCE != ret && OPAL_ERR_TEMP_OUT_OF_RESOURCE != ret)) { break; diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index 449bbea0641..5fe6acbe7cc 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -455,7 +455,7 @@ static int ompi_osc_rdma_put_real (ompi_osc_rdma_sync_t *sync, ompi_osc_rdma_pee do { ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address, - local_handle, target_handle, size, 0, MCA_BTL_NO_ORDER, + local_handle, target_handle, size, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER, cb, context, cbdata); if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) { return OMPI_SUCCESS; diff --git a/opal/mca/btl/tcp/btl_tcp.c b/opal/mca/btl/tcp/btl_tcp.c index 4fe37f566bb..8f4a3db69ed 100644 --- a/opal/mca/btl/tcp/btl_tcp.c +++ b/opal/mca/btl/tcp/btl_tcp.c @@ -60,6 +60,36 @@ mca_btl_tcp_module_t mca_btl_tcp_module = }, .tcp_endpoints_mutex = OPAL_MUTEX_STATIC_INIT}; +void mca_btl_tcp_put_response(mca_btl_base_module_t *btl, + const mca_btl_base_receive_descriptor_t *desc) { + + mca_btl_tcp_frag_t *frag = NULL; + + MCA_BTL_TCP_FRAG_ALLOC_USER(frag); + assert(NULL != frag); + + memset(frag, 0, sizeof(mca_btl_tcp_frag_t)); + + frag->endpoint = desc->endpoint; + frag->hdr.myself_on_origin = desc->cbdata; + + frag->btl = btl; + frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT_ACK; + frag->hdr.base.tag = MCA_BTL_TAG_BTL; + + frag->base.order = MCA_BTL_NO_ORDER; + frag->base.des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + + frag->iov_cnt = 1; + frag->iov_ptr = frag->iov; + frag->iov[0].iov_base = (IOVBASE_TYPE *) &frag->hdr; + frag->iov[0].iov_len = sizeof(frag->hdr); + + mca_btl_tcp_endpoint_send(frag->endpoint, frag); +} + + + static int mca_btl_tcp_register_error_cb(struct mca_btl_base_module_t *btl, mca_btl_base_module_error_cb_fn_t cbfunc) { @@ -401,7 +431,15 @@ int mca_btl_tcp_put(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t * frag->iov[i + 2].iov_base = (IOVBASE_TYPE *) frag->segments[i].seg_addr.pval; frag->iov_cnt++; } - frag->hdr.base.tag = MCA_BTL_TAG_BTL; + + if(flags & MCA_BTL_TAG_OSC_RDMA) { + frag->hdr.base.tag = MCA_BTL_TCP_TAG_PUT_RESP; + frag->hdr.myself_on_origin = frag; + } + else { + frag->hdr.base.tag = MCA_BTL_TAG_BTL; + } + frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT; frag->hdr.count = 1; if (endpoint->endpoint_nbo) { @@ -428,7 +466,6 @@ int mca_btl_tcp_get(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t * MCA_BTL_TCP_FRAG_ALLOC_USER(frag); if (OPAL_UNLIKELY(NULL == frag)) { return OPAL_ERR_OUT_OF_RESOURCE; - ; } frag->endpoint = endpoint; diff --git a/opal/mca/btl/tcp/btl_tcp.h b/opal/mca/btl/tcp/btl_tcp.h index 55fed24398b..33c4a92aece 100644 --- a/opal/mca/btl/tcp/btl_tcp.h +++ b/opal/mca/btl/tcp/btl_tcp.h @@ -55,6 +55,8 @@ #define MCA_BTL_TCP_STATISTICS 0 BEGIN_C_DECLS +#define MCA_BTL_TCP_TAG_PUT_RESP (MCA_BTL_TAG_OSC_RDMA + 1) + extern opal_event_base_t *mca_btl_tcp_event_base; #define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \ @@ -336,5 +338,8 @@ int mca_btl_tcp_send_blocking(int sd, const void *data, size_t size); */ int mca_btl_tcp_recv_blocking(int sd, void *data, size_t size); +void mca_btl_tcp_put_response(mca_btl_base_module_t *btl, + const mca_btl_base_receive_descriptor_t *desc); + END_C_DECLS #endif diff --git a/opal/mca/btl/tcp/btl_tcp_component.c b/opal/mca/btl/tcp/btl_tcp_component.c index 2c5318302d4..6b4d9ca61c2 100644 --- a/opal/mca/btl/tcp/btl_tcp_component.c +++ b/opal/mca/btl/tcp/btl_tcp_component.c @@ -1220,6 +1220,12 @@ static int mca_btl_tcp_component_exchange(void) return rc; } + +static void mca_btl_tcp_emu_init(void) +{ + mca_btl_base_active_message_trigger[MCA_BTL_TCP_TAG_PUT_RESP].cbfunc = mca_btl_tcp_put_response; +} + /* * TCP module initialization: * (1) read interface list from kernel and compare against module parameters @@ -1318,6 +1324,9 @@ mca_btl_base_module_t **mca_btl_tcp_component_init(int *num_btl_modules, memcpy(btls, mca_btl_tcp_component.tcp_btls, mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_module_t *)); *num_btl_modules = mca_btl_tcp_component.tcp_num_btls; + + mca_btl_tcp_emu_init(); + return btls; } diff --git a/opal/mca/btl/tcp/btl_tcp_endpoint.c b/opal/mca/btl/tcp/btl_tcp_endpoint.c index 1aa7031c0ac..2b583f47394 100644 --- a/opal/mca/btl/tcp/btl_tcp_endpoint.c +++ b/opal/mca/btl/tcp/btl_tcp_endpoint.c @@ -1044,6 +1044,23 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void *user) .cbdata = reg->cbdata}; reg->cbfunc(&frag->btl->super, &desc); } + else if(MCA_BTL_TCP_HDR_TYPE_PUT == frag->hdr.type && (MCA_BTL_TCP_TAG_PUT_RESP == frag->hdr.base.tag)) { + mca_btl_active_message_callback_t *reg = mca_btl_base_active_message_trigger + + frag->hdr.base.tag; + const mca_btl_base_receive_descriptor_t desc = { + .endpoint = btl_endpoint, + .cbdata = frag->hdr.myself_on_origin + }; + reg->cbfunc(&frag->btl->super, &desc); + } + else if(MCA_BTL_TCP_HDR_TYPE_PUT_ACK == frag->hdr.type) { + mca_btl_tcp_frag_t *frag_back = (mca_btl_tcp_frag_t *) frag->hdr.myself_on_origin; + assert(frag_back->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); + if (NULL != frag_back->base.des_cbfunc) { + frag_back->base.des_cbfunc(&frag_back->btl->super, frag_back->endpoint, &frag_back->base, frag_back->rc); + } + MCA_BTL_TCP_FRAG_RETURN(frag_back); + } #if MCA_BTL_TCP_ENDPOINT_CACHE if (0 != btl_endpoint->endpoint_cache_length) { /* If the cache still contain some data we can reuse the same fragment @@ -1113,12 +1130,15 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void *user) /* if required - update request status and release fragment */ OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); - assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); - if (NULL != frag->base.des_cbfunc) { - frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); - } - if (btl_ownership) { - MCA_BTL_TCP_FRAG_RETURN(frag); + if(MCA_BTL_TCP_TAG_PUT_RESP != frag->hdr.base.tag) { + assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); + if (NULL != frag->base.des_cbfunc) { + frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); + } + if (btl_ownership) { + MCA_BTL_TCP_FRAG_RETURN(frag); + } + } /* if we fail to take the lock simply return. In the worst case the * send_handler will be triggered once more, and as there will be diff --git a/opal/mca/btl/tcp/btl_tcp_frag.c b/opal/mca/btl/tcp/btl_tcp_frag.c index e401a48b81e..8d269c047d5 100644 --- a/opal/mca/btl/tcp/btl_tcp_frag.c +++ b/opal/mca/btl/tcp/btl_tcp_frag.c @@ -306,6 +306,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t *frag, int sd) goto repeat; } break; + case MCA_BTL_TCP_HDR_TYPE_PUT_ACK: case MCA_BTL_TCP_HDR_TYPE_GET: default: break; diff --git a/opal/mca/btl/tcp/btl_tcp_hdr.h b/opal/mca/btl/tcp/btl_tcp_hdr.h index 69ec22f6fdf..01fb29e0eaf 100644 --- a/opal/mca/btl/tcp/btl_tcp_hdr.h +++ b/opal/mca/btl/tcp/btl_tcp_hdr.h @@ -28,11 +28,14 @@ BEGIN_C_DECLS /** * TCP header. */ +typedef enum { + MCA_BTL_TCP_HDR_TYPE_SEND = 1, + MCA_BTL_TCP_HDR_TYPE_PUT, + MCA_BTL_TCP_HDR_TYPE_GET, + MCA_BTL_TCP_HDR_TYPE_FIN, + MCA_BTL_TCP_HDR_TYPE_PUT_ACK +} mca_btl_tcp_hdr_type_t; -#define MCA_BTL_TCP_HDR_TYPE_SEND 1 -#define MCA_BTL_TCP_HDR_TYPE_PUT 2 -#define MCA_BTL_TCP_HDR_TYPE_GET 3 -#define MCA_BTL_TCP_HDR_TYPE_FIN 4 /* The MCA_BTL_TCP_HDR_TYPE_FIN is a special kind of message sent during normal * connexion closing. Before the endpoint closes the socket, it performs a * 1-way handshake by sending a FIN message in the socket. This lets the other @@ -50,6 +53,7 @@ struct mca_btl_tcp_hdr_t { uint8_t type; uint16_t count; uint32_t size; + void *myself_on_origin; /* Pointer back to frag on Put initiator */ }; typedef struct mca_btl_tcp_hdr_t mca_btl_tcp_hdr_t;