Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ompi/mca/osc/rdma/osc_rdma_accumulate.c
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr,

do {
ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address,
local_handle, target_handle, len, 0, MCA_BTL_NO_ORDER,
local_handle, target_handle, len, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER,
ompi_osc_rdma_cas_put_complete, (void *) &complete, NULL);
if (OPAL_SUCCESS == ret || (OPAL_ERR_OUT_OF_RESOURCE != ret && OPAL_ERR_TEMP_OUT_OF_RESOURCE != ret)) {
break;
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/osc/rdma/osc_rdma_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ static int ompi_osc_rdma_put_real (ompi_osc_rdma_sync_t *sync, ompi_osc_rdma_pee

do {
ret = btl->btl_put (btl, peer->data_endpoint, ptr, target_address,
local_handle, target_handle, size, 0, MCA_BTL_NO_ORDER,
local_handle, target_handle, size, MCA_BTL_TAG_OSC_RDMA, MCA_BTL_NO_ORDER,
cb, context, cbdata);
if (OPAL_UNLIKELY(OMPI_SUCCESS == ret)) {
return OMPI_SUCCESS;
Expand Down
41 changes: 39 additions & 2 deletions opal/mca/btl/tcp/btl_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,36 @@ mca_btl_tcp_module_t mca_btl_tcp_module =
},
.tcp_endpoints_mutex = OPAL_MUTEX_STATIC_INIT};

void mca_btl_tcp_put_response(mca_btl_base_module_t *btl,
const mca_btl_base_receive_descriptor_t *desc) {

mca_btl_tcp_frag_t *frag = NULL;

MCA_BTL_TCP_FRAG_ALLOC_USER(frag);
assert(NULL != frag);

memset(frag, 0, sizeof(mca_btl_tcp_frag_t));

frag->endpoint = desc->endpoint;
frag->hdr.myself_on_origin = desc->cbdata;

frag->btl = btl;
frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT_ACK;
frag->hdr.base.tag = MCA_BTL_TAG_BTL;

frag->base.order = MCA_BTL_NO_ORDER;
frag->base.des_flags = MCA_BTL_DES_FLAGS_PRIORITY;

frag->iov_cnt = 1;
frag->iov_ptr = frag->iov;
frag->iov[0].iov_base = (IOVBASE_TYPE *) &frag->hdr;
frag->iov[0].iov_len = sizeof(frag->hdr);

mca_btl_tcp_endpoint_send(frag->endpoint, frag);
}



static int mca_btl_tcp_register_error_cb(struct mca_btl_base_module_t *btl,
mca_btl_base_module_error_cb_fn_t cbfunc)
{
Expand Down Expand Up @@ -401,7 +431,15 @@ int mca_btl_tcp_put(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *
frag->iov[i + 2].iov_base = (IOVBASE_TYPE *) frag->segments[i].seg_addr.pval;
frag->iov_cnt++;
}
frag->hdr.base.tag = MCA_BTL_TAG_BTL;

if(flags & MCA_BTL_TAG_OSC_RDMA) {
frag->hdr.base.tag = MCA_BTL_TCP_TAG_PUT_RESP;
frag->hdr.myself_on_origin = frag;
}
else {
frag->hdr.base.tag = MCA_BTL_TAG_BTL;
}

frag->hdr.type = MCA_BTL_TCP_HDR_TYPE_PUT;
frag->hdr.count = 1;
if (endpoint->endpoint_nbo) {
Expand All @@ -428,7 +466,6 @@ int mca_btl_tcp_get(mca_btl_base_module_t *btl, struct mca_btl_base_endpoint_t *
MCA_BTL_TCP_FRAG_ALLOC_USER(frag);
if (OPAL_UNLIKELY(NULL == frag)) {
return OPAL_ERR_OUT_OF_RESOURCE;
;
}

frag->endpoint = endpoint;
Expand Down
5 changes: 5 additions & 0 deletions opal/mca/btl/tcp/btl_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
#define MCA_BTL_TCP_STATISTICS 0
BEGIN_C_DECLS

#define MCA_BTL_TCP_TAG_PUT_RESP (MCA_BTL_TAG_OSC_RDMA + 1)

extern opal_event_base_t *mca_btl_tcp_event_base;

#define MCA_BTL_TCP_COMPLETE_FRAG_SEND(frag) \
Expand Down Expand Up @@ -336,5 +338,8 @@ int mca_btl_tcp_send_blocking(int sd, const void *data, size_t size);
*/
int mca_btl_tcp_recv_blocking(int sd, void *data, size_t size);

void mca_btl_tcp_put_response(mca_btl_base_module_t *btl,
const mca_btl_base_receive_descriptor_t *desc);

END_C_DECLS
#endif
9 changes: 9 additions & 0 deletions opal/mca/btl/tcp/btl_tcp_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,12 @@ static int mca_btl_tcp_component_exchange(void)
return rc;
}


static void mca_btl_tcp_emu_init(void)
{
mca_btl_base_active_message_trigger[MCA_BTL_TCP_TAG_PUT_RESP].cbfunc = mca_btl_tcp_put_response;
}

/*
* TCP module initialization:
* (1) read interface list from kernel and compare against module parameters
Expand Down Expand Up @@ -1318,6 +1324,9 @@ mca_btl_base_module_t **mca_btl_tcp_component_init(int *num_btl_modules,
memcpy(btls, mca_btl_tcp_component.tcp_btls,
mca_btl_tcp_component.tcp_num_btls * sizeof(mca_btl_tcp_module_t *));
*num_btl_modules = mca_btl_tcp_component.tcp_num_btls;

mca_btl_tcp_emu_init();

return btls;
}

Expand Down
32 changes: 26 additions & 6 deletions opal/mca/btl/tcp/btl_tcp_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,23 @@ static void mca_btl_tcp_endpoint_recv_handler(int sd, short flags, void *user)
.cbdata = reg->cbdata};
reg->cbfunc(&frag->btl->super, &desc);
}
else if(MCA_BTL_TCP_HDR_TYPE_PUT == frag->hdr.type && (MCA_BTL_TCP_TAG_PUT_RESP == frag->hdr.base.tag)) {
mca_btl_active_message_callback_t *reg = mca_btl_base_active_message_trigger
+ frag->hdr.base.tag;
const mca_btl_base_receive_descriptor_t desc = {
.endpoint = btl_endpoint,
.cbdata = frag->hdr.myself_on_origin
};
reg->cbfunc(&frag->btl->super, &desc);
}
else if(MCA_BTL_TCP_HDR_TYPE_PUT_ACK == frag->hdr.type) {
mca_btl_tcp_frag_t *frag_back = (mca_btl_tcp_frag_t *) frag->hdr.myself_on_origin;
assert(frag_back->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK);
if (NULL != frag_back->base.des_cbfunc) {
frag_back->base.des_cbfunc(&frag_back->btl->super, frag_back->endpoint, &frag_back->base, frag_back->rc);
}
MCA_BTL_TCP_FRAG_RETURN(frag_back);
}
#if MCA_BTL_TCP_ENDPOINT_CACHE
if (0 != btl_endpoint->endpoint_cache_length) {
/* If the cache still contain some data we can reuse the same fragment
Expand Down Expand Up @@ -1113,12 +1130,15 @@ static void mca_btl_tcp_endpoint_send_handler(int sd, short flags, void *user)

/* if required - update request status and release fragment */
OPAL_THREAD_UNLOCK(&btl_endpoint->endpoint_send_lock);
assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK);
if (NULL != frag->base.des_cbfunc) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
}
if (btl_ownership) {
MCA_BTL_TCP_FRAG_RETURN(frag);
if(MCA_BTL_TCP_TAG_PUT_RESP != frag->hdr.base.tag) {
assert(frag->base.des_flags & MCA_BTL_DES_SEND_ALWAYS_CALLBACK);
if (NULL != frag->base.des_cbfunc) {
frag->base.des_cbfunc(&frag->btl->super, frag->endpoint, &frag->base, frag->rc);
}
if (btl_ownership) {
MCA_BTL_TCP_FRAG_RETURN(frag);
}

}
/* if we fail to take the lock simply return. In the worst case the
* send_handler will be triggered once more, and as there will be
Expand Down
1 change: 1 addition & 0 deletions opal/mca/btl/tcp/btl_tcp_frag.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ bool mca_btl_tcp_frag_recv(mca_btl_tcp_frag_t *frag, int sd)
goto repeat;
}
break;
case MCA_BTL_TCP_HDR_TYPE_PUT_ACK:
case MCA_BTL_TCP_HDR_TYPE_GET:
default:
break;
Expand Down
12 changes: 8 additions & 4 deletions opal/mca/btl/tcp/btl_tcp_hdr.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ BEGIN_C_DECLS
/**
* TCP header.
*/
typedef enum {
MCA_BTL_TCP_HDR_TYPE_SEND = 1,
MCA_BTL_TCP_HDR_TYPE_PUT,
MCA_BTL_TCP_HDR_TYPE_GET,
MCA_BTL_TCP_HDR_TYPE_FIN,
MCA_BTL_TCP_HDR_TYPE_PUT_ACK
} mca_btl_tcp_hdr_type_t;

#define MCA_BTL_TCP_HDR_TYPE_SEND 1
#define MCA_BTL_TCP_HDR_TYPE_PUT 2
#define MCA_BTL_TCP_HDR_TYPE_GET 3
#define MCA_BTL_TCP_HDR_TYPE_FIN 4
/* The MCA_BTL_TCP_HDR_TYPE_FIN is a special kind of message sent during normal
* connexion closing. Before the endpoint closes the socket, it performs a
* 1-way handshake by sending a FIN message in the socket. This lets the other
Expand All @@ -50,6 +53,7 @@ struct mca_btl_tcp_hdr_t {
uint8_t type;
uint16_t count;
uint32_t size;
void *myself_on_origin; /* Pointer back to frag on Put initiator */
};
typedef struct mca_btl_tcp_hdr_t mca_btl_tcp_hdr_t;

Expand Down