diff --git a/ompi/mca/osc/rdma/osc_rdma_accumulate.c b/ompi/mca/osc/rdma/osc_rdma_accumulate.c index 15f0a80714e..5d864518246 100644 --- a/ompi/mca/osc/rdma/osc_rdma_accumulate.c +++ b/ompi/mca/osc/rdma/osc_rdma_accumulate.c @@ -758,7 +758,7 @@ static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr, do { ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address, - local_handle, target_handle, len, 0, MCA_BTL_NO_ORDER, + local_handle, target_handle, len, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER, ompi_osc_rdma_cas_put_complete, (void *) &complete, NULL); if (OPAL_SUCCESS == ret || (OPAL_ERR_OUT_OF_RESOURCE != ret && OPAL_ERR_TEMP_OUT_OF_RESOURCE != ret)) { break; diff --git a/ompi/mca/osc/rdma/osc_rdma_comm.c b/ompi/mca/osc/rdma/osc_rdma_comm.c index 449bbea0641..5fe6acbe7cc 100644 --- a/ompi/mca/osc/rdma/osc_rdma_comm.c +++ b/ompi/mca/osc/rdma/osc_rdma_comm.c @@ -455,7 +455,7 @@ static int ompi_osc_rdma_put_real (ompi_osc_rdma_sync_t *sync, ompi_osc_rdma_pee do { ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address, - local_handle, target_handle, size, 0, MCA_BTL_NO_ORDER, + local_handle, target_handle, size, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER, cb, context, cbdata); if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) { return OMPI_SUCCESS; diff --git a/opal/mca/btl/tcp/btl_tcp.c b/opal/mca/btl/tcp/btl_tcp.c index 4fe37f566bb..8f4a3db69ed 100644 --- a/opal/mca/btl/tcp/btl_tcp.c +++ b/opal/mca/btl/tcp/btl_tcp.c @@ -60,6 +60,36 @@ mca_btl_tcp_module_t mca_btl_tcp_module = }, .tcp_endpoints_mutex = OPAL_MUTEX_STATIC_INIT}; +void mca_btl_tcp_put_response(mca_btl_base_module_t *btl, + const mca_btl_base_receive_descriptor_t *desc) { + + mca_btl_tcp_frag_t *frag = NULL; + + MCA_BTL_TCP_FRAG_ALLOC_USER(frag); + assert(NULL != frag); + + memset(frag, 0, sizeof(mca_btl_tcp_frag_t)); + + frag->endpoint = desc->endpoint; + frag->hdr.myself_on_origin = desc->cbdata; + + frag->btl = btl; + frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT_ACK; + frag->hdr.base.tag = MCA_BTL_TAG_BTL; + + frag->base.order = MCA_BTL_NO_ORDER; + frag->base.des_flags = MCA_BTL_DES_FLAGS_PRIORITY; + + frag->iov_cnt = 1; + frag->iov_ptr = frag->iov; + frag->iov[0].iov_base = (IOVBASE_TYPE *) &frag->hdr; + frag->iov[0].iov_len = sizeof(frag->hdr); + + mca_btl_tcp_endpoint_send(frag->endpoint, frag); +} + + + static int mca_btl_tcp_register_error_cb(struct mca_btl_base_module_t *btl, mca_btl_base_module_error_cb_fn_t cbfunc) { @@ -401,7 +431,15 @@ int mca_btl_tcp_put(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t * frag->iov[i + 2].iov_base = (IOVBASE_TYPE *) frag->segments[i].seg_addr.pval; frag->iov_cnt++; } - frag->hdr.base.tag = MCA_BTL_TAG_BTL; + + if(flags & MCA_BTL_TAG_OSC_RDMA) { + frag->hdr.base.tag = MCA_BTL_TCP_TAG_PUT_RESP; + frag->hdr.myself_on_origin = frag; + } + else { + frag->hdr.base.tag = MCA_BTL_TAG_BTL; + } + frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT; frag->hdr.count = 1; if (endpoint->endpoint_nbo) { @@ -428,7 +466,6 @@ int mca_btl_tcp_get(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t * MCA_BTL_TCP_FRAG_ALLOC_USER(frag); if (OPAL_UNLIKELY(NULL == frag)) { return OPAL_ERR_OUT_OF_RESOURCE; - ; } frag->endpoint = endpoint; diff --git a/opal/mca/btl/tcp/btl_tcp.h b/opal/mca/btl/tcp/btl_tcp.h index 55fed24398b..33c4a92aece 100644 --- a/opal/mca/btl/tcp/btl_tcp.h +++ b/opal/mca/btl/tcp/btl_tcp.h @@ -55,6 +55,8 @@ #define MCA_BTL_TCP_STATISTICS 0 BEGIN_C_DECLS +#define MCA_BTL_TCP_TAG_PUT_RESP (MCA_BTL_TAG_OSC_RDMA + 1) + extern opal_event_base_t *mca_btl_tcp_event_base; #define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \ @@ -336,5 +338,8 @@ int mca_btl_tcp_send_blocking(int sd, const void *data, size_t size); */ int mca_btl_tcp_recv_blocking(int sd, void *data, size_t size); +void mca_btl_tcp_put_response(mca_btl_base_module_t *btl, + const mca_btl_base_receive_descriptor_t *desc); + END_C_DECLS #endif diff --git a/opal/mca/btl/tcp/btl_tcp_component.c b/opal/mca/btl/tcp/btl_tcp_component.c index 2c5318302d4..6b4d9ca61c2 100644 --- a/opal/mca/btl/tcp/btl_tcp_component.c +++ b/opal/mca/btl/tcp/btl_tcp_component.c @@ -1220,6 +1220,12 @@ static int mca_btl_tcp_component_exchange(void) return rc; } + +static void mca_btl_tcp_emu_init(void) +{ + mca_btl_base_active_message_trigger[MCA_BTL_TCP_TAG_PUT_RESP].cbfunc = mca_btl_tcp_put_response; +} + /* * TCP module initialization: * (1) read interface list from kernel and compare against module parameters @@ -1318,6 +1324,9 @@ mca_btl_base_module_t **mca_btl_tcp_component_init(int *num_btl_modules, memcpy(btls, mca_btl_tcp_component.tcp_btls, mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_module_t *)); *num_btl_modules = mca_btl_tcp_component.tcp_num_btls; + + mca_btl_tcp_emu_init(); + return btls; } diff --git a/opal/mca/btl/tcp/btl_tcp_endpoint.c b/opal/mca/btl/tcp/btl_tcp_endpoint.c index 1aa7031c0ac..2b583f47394 100644 --- a/opal/mca/btl/tcp/btl_tcp_endpoint.c +++ b/opal/mca/btl/tcp/btl_tcp_endpoint.c @@ -1044,6 +1044,23 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void *user) .cbdata = reg->cbdata}; reg->cbfunc(&frag->btl->super, &desc); } + else if(MCA_BTL_TCP_HDR_TYPE_PUT == frag->hdr.type && (MCA_BTL_TCP_TAG_PUT_RESP == frag->hdr.base.tag)) { + mca_btl_active_message_callback_t *reg = mca_btl_base_active_message_trigger + + frag->hdr.base.tag; + const mca_btl_base_receive_descriptor_t desc = { + .endpoint = btl_endpoint, + .cbdata = frag->hdr.myself_on_origin + }; + reg->cbfunc(&frag->btl->super, &desc); + } + else if(MCA_BTL_TCP_HDR_TYPE_PUT_ACK == frag->hdr.type) { + mca_btl_tcp_frag_t *frag_back = (mca_btl_tcp_frag_t *) frag->hdr.myself_on_origin; + assert(frag_back->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); + if (NULL != frag_back->base.des_cbfunc) { + frag_back->base.des_cbfunc(&frag_back->btl->super, frag_back->endpoint, &frag_back->base, frag_back->rc); + } + MCA_BTL_TCP_FRAG_RETURN(frag_back); + } #if MCA_BTL_TCP_ENDPOINT_CACHE if (0 != btl_endpoint->endpoint_cache_length) { /* If the cache still contain some data we can reuse the same fragment @@ -1113,12 +1130,15 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void *user) /* if required - update request status and release fragment */ OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock); - assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); - if (NULL != frag->base.des_cbfunc) { - frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); - } - if (btl_ownership) { - MCA_BTL_TCP_FRAG_RETURN(frag); + if(MCA_BTL_TCP_TAG_PUT_RESP != frag->hdr.base.tag) { + assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK); + if (NULL != frag->base.des_cbfunc) { + frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc); + } + if (btl_ownership) { + MCA_BTL_TCP_FRAG_RETURN(frag); + } + } /* if we fail to take the lock simply return. In the worst case the * send_handler will be triggered once more, and as there will be diff --git a/opal/mca/btl/tcp/btl_tcp_frag.c b/opal/mca/btl/tcp/btl_tcp_frag.c index e401a48b81e..8d269c047d5 100644 --- a/opal/mca/btl/tcp/btl_tcp_frag.c +++ b/opal/mca/btl/tcp/btl_tcp_frag.c @@ -306,6 +306,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t *frag, int sd) goto repeat; } break; + case MCA_BTL_TCP_HDR_TYPE_PUT_ACK: case MCA_BTL_TCP_HDR_TYPE_GET: default: break; diff --git a/opal/mca/btl/tcp/btl_tcp_hdr.h b/opal/mca/btl/tcp/btl_tcp_hdr.h index 69ec22f6fdf..01fb29e0eaf 100644 --- a/opal/mca/btl/tcp/btl_tcp_hdr.h +++ b/opal/mca/btl/tcp/btl_tcp_hdr.h @@ -28,11 +28,14 @@ BEGIN_C_DECLS /** * TCP header. */ +typedef enum { + MCA_BTL_TCP_HDR_TYPE_SEND = 1, + MCA_BTL_TCP_HDR_TYPE_PUT, + MCA_BTL_TCP_HDR_TYPE_GET, + MCA_BTL_TCP_HDR_TYPE_FIN, + MCA_BTL_TCP_HDR_TYPE_PUT_ACK +} mca_btl_tcp_hdr_type_t; -#define MCA_BTL_TCP_HDR_TYPE_SEND 1 -#define MCA_BTL_TCP_HDR_TYPE_PUT 2 -#define MCA_BTL_TCP_HDR_TYPE_GET 3 -#define MCA_BTL_TCP_HDR_TYPE_FIN 4 /* The MCA_BTL_TCP_HDR_TYPE_FIN is a special kind of message sent during normal * connexion closing. Before the endpoint closes the socket, it performs a * 1-way handshake by sending a FIN message in the socket. This lets the other @@ -50,6 +53,7 @@ struct mca_btl_tcp_hdr_t { uint8_t type; uint16_t count; uint32_t size; + void *myself_on_origin; /* Pointer back to frag on Put initiator */ }; typedef struct mca_btl_tcp_hdr_t mca_btl_tcp_hdr_t;