From b8af310efae3358f2b6f7e6eb618744ab099562f Mon Sep 17 00:00:00 2001 From: Nathan Hjelm Date: Tue, 6 Oct 2015 12:47:09 -0600 Subject: [PATCH] btl/openib: remove extra threads This commit removes the service and async event threads from the openib btl. Both threads are replaced by opal progress thread support. The run_in_main function is now supported by allocating an event and adding it to the sync event base. This ensures that the requested function is called as part of opal_progress. Signed-off-by: Nathan Hjelm --- opal/mca/btl/openib/Makefile.am | 2 - opal/mca/btl/openib/btl_openib.c | 24 +- opal/mca/btl/openib/btl_openib.h | 16 +- opal/mca/btl/openib/btl_openib_async.c | 510 ++++--------- opal/mca/btl/openib/btl_openib_async.h | 47 +- opal/mca/btl/openib/btl_openib_component.c | 98 ++- opal/mca/btl/openib/btl_openib_fd.c | 693 ------------------ opal/mca/btl/openib/btl_openib_fd.h | 81 -- .../connect/btl_openib_connect_rdmacm.c | 190 ++--- .../openib/connect/btl_openib_connect_udcm.c | 124 ++-- 10 files changed, 386 insertions(+), 1399 deletions(-) delete mode 100644 opal/mca/btl/openib/btl_openib_fd.c delete mode 100644 opal/mca/btl/openib/btl_openib_fd.h diff --git a/opal/mca/btl/openib/Makefile.am b/opal/mca/btl/openib/Makefile.am index 8ec0d4398ff..24a8f9800e0 100644 --- a/opal/mca/btl/openib/Makefile.am +++ b/opal/mca/btl/openib/Makefile.am @@ -55,8 +55,6 @@ sources = \ btl_openib_async.h \ btl_openib_xrc.c \ btl_openib_xrc.h \ - btl_openib_fd.h \ - btl_openib_fd.c \ btl_openib_ip.h \ btl_openib_ip.c \ btl_openib_put.c \ diff --git a/opal/mca/btl/openib/btl_openib.c b/opal/mca/btl/openib/btl_openib.c index 9f82110cef1..57310af0fd1 100644 --- a/opal/mca/btl/openib/btl_openib.c +++ b/opal/mca/btl/openib/btl_openib.c @@ -638,28 +638,10 @@ static int prepare_device_for_use (mca_btl_openib_device_t *device) OBJ_CONSTRUCT(&device->qps[qp_index].recv_free, opal_free_list_t); } - if(mca_btl_openib_component.use_async_event_thread) { - mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_CMD_FD_ADD, - .fd = device->ib_dev_context->async_fd, - .qp = NULL}; + device->got_fatal_event = false; + device->got_port_event = false; + mca_btl_openib_async_add_device (device); - /* start the async even thread if it is not already started */ - if (start_async_event_thread() != OPAL_SUCCESS) - return OPAL_ERROR; - - device->got_fatal_event = false; - device->got_port_event = false; - if (write(mca_btl_openib_component.async_pipe[1], - &async_command, sizeof(mca_btl_openib_async_cmd_t))<0){ - BTL_ERROR(("Failed to write to pipe [%d]",errno)); - return OPAL_ERROR; - } - /* wait for ok from thread */ - if (OPAL_SUCCESS != - btl_openib_async_command_done(device->ib_dev_context->async_fd)) { - return OPAL_ERROR; - } - } #if OPAL_ENABLE_PROGRESS_THREADS == 1 /* Prepare data for thread, but not starting it */ OBJ_CONSTRUCT(&device->thread, opal_thread_t); diff --git a/opal/mca/btl/openib/btl_openib.h b/opal/mca/btl/openib/btl_openib.h index 6e1c5ca50ff..a9c13e3d130 100644 --- a/opal/mca/btl/openib/btl_openib.h +++ b/opal/mca/btl/openib/btl_openib.h @@ -48,6 +48,7 @@ #include "opal/mca/mpool/mpool.h" #include "opal/mca/btl/base/btl_base_error.h" #include "opal/mca/btl/base/base.h" +#include "opal/runtime/opal_progress_threads.h" #include "connect/connect.h" @@ -227,9 +228,7 @@ struct mca_btl_openib_component_t { int apm_ports; unsigned int buffer_alignment; /**< Preferred communication buffer alignment in Bytes (must be power of two) */ int32_t error_counter; /**< Counts number on error events that we got on all devices */ - int async_pipe[2]; /**< Pipe for comunication with async event thread */ - int async_comp_pipe[2]; /**< Pipe for async thread comunication with main thread */ - pthread_t async_thread; /**< Async thread that will handle fatal errors */ + opal_event_base_t *async_evbase; /**< Async event base */ bool use_async_event_thread; /**< Use the async event handler */ mca_btl_openib_srq_manager_t srq_manager; /**< Hash table for all BTL SRQs */ #if BTL_OPENIB_FAILOVER_ENABLED @@ -410,6 +409,8 @@ typedef struct mca_btl_openib_device_t { uint64_t mem_reg_max, mem_reg_active; /* Device is ready for use */ bool ready_for_use; + /* Async event */ + opal_event_t async_event; } mca_btl_openib_device_t; OBJ_CLASS_DECLARATION(mca_btl_openib_device_t); @@ -907,6 +908,15 @@ static inline int qp_cq_prio(const int qp) #define BTL_OPENIB_RDMA_QP(QP) \ ((QP) == mca_btl_openib_component.rdma_qp) +/** + * Run function as part of opal_progress() + * + * @param[in] fn function to run + * @param[in] arg function data + */ +int mca_btl_openib_run_in_main (void *(*fn)(void *), void *arg); + + END_C_DECLS #endif /* MCA_BTL_IB_H */ diff --git a/opal/mca/btl/openib/btl_openib_async.c b/opal/mca/btl/openib/btl_openib_async.c index d5cc5cc303f..3662624292e 100644 --- a/opal/mca/btl/openib/btl_openib_async.c +++ b/opal/mca/btl/openib/btl_openib_async.c @@ -36,6 +36,10 @@ #include "btl_openib_proc.h" #include "btl_openib_endpoint.h" +static opal_list_t ignore_qp_err_list; +static opal_mutex_t ignore_qp_err_list_lock; +static int32_t btl_openib_async_device_count = 0; + struct mca_btl_openib_async_poll { int active_poll_size; int poll_size; @@ -50,14 +54,7 @@ typedef struct { OBJ_CLASS_INSTANCE(mca_btl_openib_qp_list, opal_list_item_t, NULL, NULL); -static int return_status = OPAL_ERROR; - -static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *hcas_poll); -static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *hcas_poll, opal_list_t *ignore_qp_err_list); -static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *hcas_poll, int index, - opal_list_t *ignore_qp_err_list); static const char *openib_event_to_str (enum ibv_event_type event); -static int send_command_comp(int in); /* Function converts event to string (name) * Open Fabris don't have function that do this job :( @@ -138,132 +135,6 @@ static mca_btl_openib_endpoint_t * xrc_qp2endpoint(uint32_t qp_num, mca_btl_open #endif /* Function inits mca_btl_openib_async_poll */ -static int btl_openib_async_poll_init(struct mca_btl_openib_async_poll *devices_poll) -{ - devices_poll->active_poll_size = 1; - devices_poll->poll_size = 4; - devices_poll->async_pollfd = malloc(sizeof(struct pollfd) * devices_poll->poll_size); - if (NULL == devices_poll->async_pollfd) { - BTL_ERROR(("Failed malloc: %s:%d" - , __FILE__, __LINE__)); - return OPAL_ERROR; - } - /* Creating comunication channel with the main thread */ - devices_poll->async_pollfd[0].fd = mca_btl_openib_component.async_pipe[0]; - devices_poll->async_pollfd[0].events = POLLIN; - devices_poll->async_pollfd[0].revents = 0; - return OPAL_SUCCESS; -} - -/* Send command completion to main thread */ -static int send_command_comp(int in) -{ - if (write(mca_btl_openib_component.async_comp_pipe[1], &in, sizeof(int)) < 0) { - BTL_ERROR(("Write failed [%d]",errno)); - return OPAL_ERROR; - } - return OPAL_SUCCESS; -} - -/* Function handle async thread commands */ -static int btl_openib_async_commandh(struct mca_btl_openib_async_poll *devices_poll, opal_list_t *ignore_qp_err_list) -{ - struct pollfd *async_pollfd_tmp; - mca_btl_openib_async_cmd_t cmd; - int fd,flags,j,ret; - /* Got command from main thread */ - ret = read(devices_poll->async_pollfd[0].fd, &cmd, sizeof(mca_btl_openib_async_cmd_t)); - if (sizeof(mca_btl_openib_async_cmd_t) != ret) { - BTL_ERROR(("Read failed [%d]",errno)); - return OPAL_ERROR; - } - - BTL_VERBOSE(("Got cmd %d", cmd.a_cmd)); - if (OPENIB_ASYNC_CMD_FD_ADD == cmd.a_cmd) { - fd = cmd.fd; - BTL_VERBOSE(("Got fd %d", fd)); - BTL_VERBOSE(("Adding device [%d] to async event poll[%d]", - fd, devices_poll->active_poll_size)); - flags = fcntl(fd, F_GETFL); - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { - BTL_ERROR(("Failed to change file descriptor of async event")); - return OPAL_ERROR; - } - if ((devices_poll->active_poll_size + 1) > devices_poll->poll_size) { - devices_poll->poll_size+=devices_poll->poll_size; - async_pollfd_tmp = malloc(sizeof(struct pollfd) * devices_poll->poll_size); - if (NULL == async_pollfd_tmp) { - BTL_ERROR(("Failed malloc: %s:%d. " - "Fatal error, stoping asynch event thread" - , __FILE__, __LINE__)); - return OPAL_ERROR; - } - memcpy (async_pollfd_tmp,devices_poll->async_pollfd, - sizeof(struct pollfd) * (devices_poll->active_poll_size)); - free(devices_poll->async_pollfd); - devices_poll->async_pollfd = async_pollfd_tmp; - } - devices_poll->async_pollfd[devices_poll->active_poll_size].fd = fd; - devices_poll->async_pollfd[devices_poll->active_poll_size].events = POLLIN; - devices_poll->async_pollfd[devices_poll->active_poll_size].revents = 0; - devices_poll->active_poll_size++; - if (OPAL_SUCCESS != send_command_comp(fd)) { - return OPAL_ERROR; - } - } else if (OPENIB_ASYNC_CMD_FD_REMOVE == cmd.a_cmd) { - bool fd_found = false; - - fd = cmd.fd; - BTL_VERBOSE(("Got fd %d", fd)); - - /* Removing device from poll */ - BTL_VERBOSE(("Removing device [%d] from async event poll [%d]", - fd, devices_poll->active_poll_size)); - if (devices_poll->active_poll_size > 1) { - for (j=0; (j < devices_poll->active_poll_size || !fd_found); j++) { - if (devices_poll->async_pollfd[j].fd == fd) { - devices_poll->async_pollfd[j].fd = - devices_poll->async_pollfd[devices_poll->active_poll_size-1].fd; - devices_poll->async_pollfd[j].events = - devices_poll->async_pollfd[devices_poll->active_poll_size-1].events; - devices_poll->async_pollfd[j].revents = - devices_poll->async_pollfd[devices_poll->active_poll_size-1].revents; - fd_found = true; - } - } - if (!fd_found) { - BTL_ERROR(("Requested FD[%d] was not found in poll array",fd)); - return OPAL_ERROR; - } - } - devices_poll->active_poll_size--; - if (OPAL_SUCCESS != send_command_comp(fd)) { - return OPAL_ERROR; - } - } else if (OPENIB_ASYNC_IGNORE_QP_ERR == cmd.a_cmd) { - mca_btl_openib_qp_list *new_qp; - new_qp = OBJ_NEW(mca_btl_openib_qp_list); - BTL_VERBOSE(("Ignore errors on QP %p", (void *)cmd.qp)); - new_qp->qp = cmd.qp; - opal_list_append(ignore_qp_err_list, (opal_list_item_t *)new_qp); - send_command_comp(OPENIB_ASYNC_IGNORE_QP_ERR); - - } else if (OPENIB_ASYNC_THREAD_EXIT == cmd.a_cmd) { - /* Got 0 - command to close the thread */ - opal_list_item_t *item; - BTL_VERBOSE(("Async event thread exit")); - free(devices_poll->async_pollfd); - return_status = OPAL_SUCCESS; - - while ((item = opal_list_remove_first(ignore_qp_err_list))) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(ignore_qp_err_list); - - pthread_exit(&return_status); - } - return OPAL_SUCCESS; -} /* The main idea of resizing SRQ algorithm - We create a SRQ with size = rd_num, but for efficient usage of resources @@ -323,235 +194,118 @@ static int btl_openib_async_srq_limit_event(struct ibv_srq* srq) } /* Function handle async device events */ -static int btl_openib_async_deviceh(struct mca_btl_openib_async_poll *devices_poll, int index, - opal_list_t *ignore_qp_err_list) +static void btl_openib_async_device (int fd, short flags, void *arg) { - int j; - mca_btl_openib_device_t *device = NULL; + mca_btl_openib_device_t *device = (mca_btl_openib_device_t *) arg; struct ibv_async_event event; int event_type; - /* We need to find correct device and process this event */ - for (j=0; j < mca_btl_openib_component.ib_num_btls; j++) { - if (mca_btl_openib_component.openib_btls[j]->device->ib_dev_context->async_fd == - devices_poll->async_pollfd[index].fd ) { - device = mca_btl_openib_component.openib_btls[j]->device; - break; + if (ibv_get_async_event((struct ibv_context *)device->ib_dev_context,&event) < 0) { + if (EWOULDBLOCK != errno) { + BTL_ERROR(("Failed to get async event")); } + + return; } - if (NULL != device) { - if (ibv_get_async_event((struct ibv_context *)device->ib_dev_context,&event) < 0) { - if (EWOULDBLOCK == errno) { - /* No event found ? - * It was handled by somebody other */ - return OPAL_SUCCESS; - } else { - BTL_ERROR(("Failed to get async event")); - return OPAL_ERROR; - } - } - event_type = event.event_type; + event_type = event.event_type; #if OPAL_HAVE_CONNECTX_XRC - /* is it XRC event ?*/ - bool xrc_event = false; - if (IBV_XRC_QP_EVENT_FLAG & event.event_type) { - xrc_event = true; - /* Clean the bitnd handel as usual */ - event_type ^= IBV_XRC_QP_EVENT_FLAG; - } + /* is it XRC event ?*/ + bool xrc_event = false; + if (IBV_XRC_QP_EVENT_FLAG & event.event_type) { + xrc_event = true; + /* Clean the bitnd handel as usual */ + event_type ^= IBV_XRC_QP_EVENT_FLAG; + } #endif - switch(event_type) { - case IBV_EVENT_PATH_MIG: - BTL_ERROR(("Alternative path migration event reported")); - if (APM_ENABLED) { - BTL_ERROR(("Trying to find additional path...")); + switch(event_type) { + case IBV_EVENT_PATH_MIG: + BTL_ERROR(("Alternative path migration event reported")); + if (APM_ENABLED) { + BTL_ERROR(("Trying to find additional path...")); #if OPAL_HAVE_CONNECTX_XRC - if (xrc_event) - mca_btl_openib_load_apm_xrc_rcv(event.element.xrc_qp_num, - xrc_qp2endpoint(event.element.xrc_qp_num, device)); - else -#endif - mca_btl_openib_load_apm(event.element.qp, - qp2endpoint(event.element.qp, device)); - } - break; - case IBV_EVENT_DEVICE_FATAL: - /* Set the flag to fatal */ - device->got_fatal_event = true; - /* It is not critical to protect the counter */ - OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1); - /* fall through */ - case IBV_EVENT_CQ_ERR: - case IBV_EVENT_QP_FATAL: - if (event_type == IBV_EVENT_QP_FATAL) { - opal_list_item_t *item; - mca_btl_openib_qp_list *qp_item; - bool in_ignore_list = false; - - BTL_VERBOSE(("QP is in err state %p", (void *)event.element.qp)); - - /* look through ignore list */ - for (item = opal_list_get_first(ignore_qp_err_list); - item != opal_list_get_end(ignore_qp_err_list); - item = opal_list_get_next(item)) { - qp_item = (mca_btl_openib_qp_list *)item; - if (qp_item->qp == event.element.qp) { - BTL_VERBOSE(("QP %p is in error ignore list", - (void *)event.element.qp)); - in_ignore_list = true; - break; - } - } - if (in_ignore_list) - break; - } - - case IBV_EVENT_QP_REQ_ERR: - case IBV_EVENT_QP_ACCESS_ERR: - case IBV_EVENT_PATH_MIG_ERR: - case IBV_EVENT_SRQ_ERR: - opal_show_help("help-mpi-btl-openib.txt", "of error event", - true,opal_process_info.nodename, (int)getpid(), - event_type, - openib_event_to_str((enum ibv_event_type)event_type)); - break; - case IBV_EVENT_PORT_ERR: - opal_show_help("help-mpi-btl-openib.txt", "of error event", - true,opal_process_info.nodename, (int)getpid(), - event_type, - openib_event_to_str((enum ibv_event_type)event_type)); - /* Set the flag to indicate port error */ - device->got_port_event = true; - OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1); - break; - case IBV_EVENT_COMM_EST: - case IBV_EVENT_PORT_ACTIVE: - case IBV_EVENT_SQ_DRAINED: - case IBV_EVENT_LID_CHANGE: - case IBV_EVENT_PKEY_CHANGE: - case IBV_EVENT_SM_CHANGE: - case IBV_EVENT_QP_LAST_WQE_REACHED: -#if HAVE_DECL_IBV_EVENT_CLIENT_REREGISTER - case IBV_EVENT_CLIENT_REREGISTER: + if (xrc_event) + mca_btl_openib_load_apm_xrc_rcv(event.element.xrc_qp_num, + xrc_qp2endpoint(event.element.xrc_qp_num, device)); + else #endif - break; - /* The event is signaled when number of prepost receive WQEs is going - under predefined threshold - srq_limit */ - case IBV_EVENT_SRQ_LIMIT_REACHED: - if(OPAL_SUCCESS != - btl_openib_async_srq_limit_event(event.element.srq)) { - return OPAL_ERROR; + mca_btl_openib_load_apm(event.element.qp, + qp2endpoint(event.element.qp, device)); + } + break; + case IBV_EVENT_DEVICE_FATAL: + /* Set the flag to fatal */ + device->got_fatal_event = true; + /* It is not critical to protect the counter */ + OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1); + /* fall through */ + case IBV_EVENT_CQ_ERR: + case IBV_EVENT_QP_FATAL: + if (event_type == IBV_EVENT_QP_FATAL) { + mca_btl_openib_qp_list *qp_item; + bool in_ignore_list = false; + + BTL_VERBOSE(("QP is in err state %p", (void *)event.element.qp)); + + /* look through ignore list */ + opal_mutex_lock (&ignore_qp_err_list_lock); + OPAL_LIST_FOREACH(qp_item, &ignore_qp_err_list, mca_btl_openib_qp_list) { + if (qp_item->qp == event.element.qp) { + BTL_VERBOSE(("QP %p is in error ignore list", + (void *)event.element.qp)); + in_ignore_list = true; + break; } + } + opal_mutex_unlock (&ignore_qp_err_list_lock); + if (in_ignore_list) { break; - default: - opal_show_help("help-mpi-btl-openib.txt", "of unknown event", - true,opal_process_info.nodename, (int)getpid(), - event_type); - } - ibv_ack_async_event(&event); - } else { - /* if (device == NULL), then failed to locate the device! - This should never happen... */ - BTL_ERROR(("Failed to find device with FD %d. " - "Fatal error, stoping asynch event thread", - devices_poll->async_pollfd[index].fd)); - return OPAL_ERROR; - } - return OPAL_SUCCESS; -} - -/* This Async event thread is handling all async event of - * all btls/devices in openib component - */ -static void* btl_openib_async_thread(void * async) -{ - int rc; - int i; - struct mca_btl_openib_async_poll devices_poll; - opal_list_t ignore_qp_err_list; - - OBJ_CONSTRUCT(&ignore_qp_err_list, opal_list_t); - - if (OPAL_SUCCESS != btl_openib_async_poll_init(&devices_poll)) { - BTL_ERROR(("Fatal error, stoping asynch event thread")); - pthread_exit(&return_status); - } - - while(1) { - rc = poll(devices_poll.async_pollfd, devices_poll.active_poll_size, -1); - if (rc < 0) { - if (errno != EINTR) { - BTL_ERROR(("Poll failed. Fatal error, stoping asynch event thread")); - pthread_exit(&return_status); - } else { - /* EINTR - we got interupt */ - continue; } } - for(i = 0; i < devices_poll.active_poll_size; i++) { - switch (devices_poll.async_pollfd[i].revents) { - case 0: - /* no events */ - break; - case POLLIN: -#if defined(__SVR4) && defined(__sun) - /* - * Need workaround for Solaris IB user verbs since - * "Poll on IB async fd returns POLLRDNORM revent even though it is masked out" - */ - case POLLIN | POLLRDNORM: + /* fall through */ + case IBV_EVENT_QP_REQ_ERR: + case IBV_EVENT_QP_ACCESS_ERR: + case IBV_EVENT_PATH_MIG_ERR: + case IBV_EVENT_SRQ_ERR: + opal_show_help("help-mpi-btl-openib.txt", "of error event", + true,opal_process_info.nodename, (int)getpid(), + event_type, + openib_event_to_str((enum ibv_event_type)event_type)); + break; + case IBV_EVENT_PORT_ERR: + opal_show_help("help-mpi-btl-openib.txt", "of error event", + true,opal_process_info.nodename, (int)getpid(), + event_type, + openib_event_to_str((enum ibv_event_type)event_type)); + /* Set the flag to indicate port error */ + device->got_port_event = true; + OPAL_THREAD_ADD32(&mca_btl_openib_component.error_counter, 1); + break; + case IBV_EVENT_COMM_EST: + case IBV_EVENT_PORT_ACTIVE: + case IBV_EVENT_SQ_DRAINED: + case IBV_EVENT_LID_CHANGE: + case IBV_EVENT_PKEY_CHANGE: + case IBV_EVENT_SM_CHANGE: + case IBV_EVENT_QP_LAST_WQE_REACHED: +#if HAVE_DECL_IBV_EVENT_CLIENT_REREGISTER + case IBV_EVENT_CLIENT_REREGISTER: #endif - /* Processing our event */ - if (0 == i) { - /* 0 poll we use for comunication with main thread */ - if (OPAL_SUCCESS != btl_openib_async_commandh(&devices_poll, - &ignore_qp_err_list)) { - free(devices_poll.async_pollfd); - BTL_ERROR(("Failed to process async thread process. " - "Fatal error, stoping asynch event thread")); - pthread_exit(&return_status); - } - } else { - /* We get device event */ - if (btl_openib_async_deviceh(&devices_poll, i, - &ignore_qp_err_list)) { - free(devices_poll.async_pollfd); - BTL_ERROR(("Failed to process async thread process. " - "Fatal error, stoping asynch event thread")); - pthread_exit(&return_status); - } - } - break; - default: - /* Get event other than POLLIN - * this case should not never happend */ - BTL_ERROR(("Got unexpected event %d. " - "Fatal error, stoping asynch event thread", - devices_poll.async_pollfd[i].revents)); - free(devices_poll.async_pollfd); - pthread_exit(&return_status); - } - } - } - return PTHREAD_CANCELED; -} + break; + /* The event is signaled when number of prepost receive WQEs is going + under predefined threshold - srq_limit */ + case IBV_EVENT_SRQ_LIMIT_REACHED: + (void) btl_openib_async_srq_limit_event (event.element.srq); -int btl_openib_async_command_done(int exp) -{ - int comp; - if (read(mca_btl_openib_component.async_comp_pipe[0], &comp, - sizeof(int)) < (int) sizeof (int)){ - BTL_ERROR(("Failed to read from pipe")); - return OPAL_ERROR; - } - if (exp != comp){ - BTL_ERROR(("Get wrong completion on async command. Waiting for %d and got %d", - exp, comp)); - return OPAL_ERROR; + break; + default: + opal_show_help("help-mpi-btl-openib.txt", "of unknown event", + true,opal_process_info.nodename, (int)getpid(), + event_type); } - return OPAL_SUCCESS; + + ibv_ack_async_event(&event); } static void apm_update_attr(struct ibv_qp_attr *attr, enum ibv_qp_attr_mask *mask) @@ -685,34 +439,70 @@ void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t } #endif -int start_async_event_thread(void) +int mca_btl_openib_async_init (void) { - if (0 != mca_btl_openib_component.async_thread) { + if (!mca_btl_openib_component.use_async_event_thread || + mca_btl_openib_component.async_evbase) { return OPAL_SUCCESS; } + mca_btl_openib_component.async_evbase = opal_progress_thread_init (NULL); + + OBJ_CONSTRUCT(&ignore_qp_err_list, opal_list_t); + OBJ_CONSTRUCT(&ignore_qp_err_list_lock, opal_mutex_t); + /* Set the error counter to zero */ mca_btl_openib_component.error_counter = 0; - /* Create pipe for communication with async event thread */ - if (pipe(mca_btl_openib_component.async_pipe)) { - BTL_ERROR(("Failed to create pipe for communication with " - "async event thread")); - return OPAL_ERROR; + return OPAL_SUCCESS; +} + +void mca_btl_openib_async_fini (void) +{ + if (mca_btl_openib_component.async_evbase) { + OPAL_LIST_DESTRUCT(&ignore_qp_err_list); + OBJ_DESTRUCT(&ignore_qp_err_list_lock); + opal_progress_thread_finalize (NULL); + mca_btl_openib_component.async_evbase = NULL; } +} - if (pipe(mca_btl_openib_component.async_comp_pipe)) { - BTL_ERROR(("Failed to create comp pipe for communication with " - "main thread")); - return OPAL_ERROR; +void mca_btl_openib_async_add_device (mca_btl_openib_device_t *device) +{ + if (mca_btl_openib_component.async_evbase) { + if (1 == OPAL_THREAD_ADD32 (&btl_openib_async_device_count, 1)) { + mca_btl_openib_async_init (); + } + opal_event_set (mca_btl_openib_component.async_evbase, &device->async_event, + device->ib_dev_context->async_fd, OPAL_EV_READ | OPAL_EV_PERSIST, + btl_openib_async_device, device); + opal_event_add (&device->async_event, 0); } +} - /* Starting async event thread for the component */ - if (pthread_create(&mca_btl_openib_component.async_thread, NULL, - (void*(*)(void*)) btl_openib_async_thread, NULL)) { - BTL_ERROR(("Failed to create async event thread")); - return OPAL_ERROR; +void mca_btl_openib_async_rem_device (mca_btl_openib_device_t *device) +{ + if (mca_btl_openib_component.async_evbase) { + opal_event_del (&device->async_event); + if (0 == OPAL_THREAD_ADD32 (&btl_openib_async_device_count, -1)) { + mca_btl_openib_async_fini (); + } } +} - return OPAL_SUCCESS; +void mca_btl_openib_async_add_qp_ignore (struct ibv_qp *qp) +{ + if (mca_btl_openib_component.async_evbase) { + mca_btl_openib_qp_list *new_qp = OBJ_NEW(mca_btl_openib_qp_list); + if (OPAL_UNLIKELY(NULL == new_qp)) { + /* can allocate a small object. not much more can be done */ + return; + } + + BTL_VERBOSE(("Ignoring errors on QP %p", (void *) qp)); + new_qp->qp = qp; + opal_mutex_lock (&ignore_qp_err_list_lock); + opal_list_append (&ignore_qp_err_list, (opal_list_item_t *) new_qp); + opal_mutex_unlock (&ignore_qp_err_list_lock); + } } diff --git a/opal/mca/btl/openib/btl_openib_async.h b/opal/mca/btl/openib/btl_openib_async.h index 33137546a46..b62fdbec3fb 100644 --- a/opal/mca/btl/openib/btl_openib_async.h +++ b/opal/mca/btl/openib/btl_openib_async.h @@ -3,6 +3,8 @@ * Copyright (c) 2014 Bull SAS. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * received. * $COPYRIGHT$ * * Additional copyrights may follow @@ -16,29 +18,42 @@ #define MCA_BTL_OPENIB_ASYNC_H #include "btl_openib_endpoint.h" -int start_async_event_thread(void); void mca_btl_openib_load_apm(struct ibv_qp *qp, mca_btl_openib_endpoint_t *ep); -int btl_openib_async_command_done(int exp); #if OPAL_HAVE_CONNECTX_XRC void mca_btl_openib_load_apm_xrc_rcv(uint32_t qp_num, mca_btl_openib_endpoint_t *ep); #endif #define APM_ENABLED (0 != mca_btl_openib_component.apm_lmc || 0 != mca_btl_openib_component.apm_ports) -/* - * Command types for communicating with the async thread +/** + * Initialize the async event base + */ +int mca_btl_openib_async_init (void); + +/** + * Finalize the async event base + */ +void mca_btl_openib_async_fini (void); + +/** + * Register a device with the async event base + * + * @param[in] device device to register + */ +void mca_btl_openib_async_add_device (mca_btl_openib_device_t *device); + +/** + * Deregister a device with the async event base + * + * @param[in] device device to deregister + */ +void mca_btl_openib_async_rem_device (mca_btl_openib_device_t *device); + +/** + * Ignore error events on a queue pair + * + * @param[in] qp queue pair to ignore */ -typedef enum { - OPENIB_ASYNC_CMD_FD_ADD, - OPENIB_ASYNC_CMD_FD_REMOVE, - OPENIB_ASYNC_IGNORE_QP_ERR, - OPENIB_ASYNC_THREAD_EXIT -} btl_openib_async_cmd_type_t; - -typedef struct { - btl_openib_async_cmd_type_t a_cmd; - int fd; - struct ibv_qp *qp; -} mca_btl_openib_async_cmd_t; +void mca_btl_openib_async_add_qp_ignore (struct ibv_qp *qp); #endif diff --git a/opal/mca/btl/openib/btl_openib_component.c b/opal/mca/btl/openib/btl_openib_component.c index 4bd09e23d78..446a48afe15 100644 --- a/opal/mca/btl/openib/btl_openib_component.c +++ b/opal/mca/btl/openib/btl_openib_component.c @@ -97,7 +97,6 @@ #include "btl_openib_ini.h" #include "btl_openib_mca.h" #include "btl_openib_xrc.h" -#include "btl_openib_fd.h" #if BTL_OPENIB_FAILOVER_ENABLED #include "btl_openib_failover.h" #endif @@ -245,32 +244,13 @@ static int btl_openib_component_close(void) { int rc = OPAL_SUCCESS; - /* Tell the async thread to shutdown */ - if (mca_btl_openib_component.use_async_event_thread && - 0 != mca_btl_openib_component.async_thread) { - mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_THREAD_EXIT, - .fd = -1, .qp = NULL}; - if (write(mca_btl_openib_component.async_pipe[1], &async_command, - sizeof(mca_btl_openib_async_cmd_t)) < 0) { - BTL_ERROR(("Failed to communicate with async event thread")); - rc = OPAL_ERROR; - } else { - if (pthread_join(mca_btl_openib_component.async_thread, NULL)) { - BTL_ERROR(("Failed to stop OpenIB async event thread")); - rc = OPAL_ERROR; - } - } - close(mca_btl_openib_component.async_pipe[0]); - close(mca_btl_openib_component.async_pipe[1]); - close(mca_btl_openib_component.async_comp_pipe[0]); - close(mca_btl_openib_component.async_comp_pipe[1]); - } + /* remove the async event from the event base */ + mca_btl_openib_async_fini (); OBJ_DESTRUCT(&mca_btl_openib_component.srq_manager.lock); OBJ_DESTRUCT(&mca_btl_openib_component.srq_manager.srq_addr_table); opal_btl_openib_connect_base_finalize(); - opal_btl_openib_fd_finalize(); opal_btl_openib_ini_finalize(); if (NULL != mca_btl_openib_component.default_recv_qps) { @@ -906,7 +886,7 @@ static void device_construct(mca_btl_openib_device_t *device) device->ib_dev_context = NULL; device->ib_pd = NULL; device->mpool = NULL; -#if OPAL_ENABLE_PROGRESS_THREADS +#if OPAL_ENABLE_PROGRESS_THREADS == 1 device->ib_channel = NULL; #endif device->btls = 0; @@ -926,10 +906,6 @@ static void device_construct(mca_btl_openib_device_t *device) device->xrc_fd = -1; #endif device->qps = NULL; - mca_btl_openib_component.async_pipe[0] = - mca_btl_openib_component.async_pipe[1] = -1; - mca_btl_openib_component.async_comp_pipe[0] = - mca_btl_openib_component.async_comp_pipe[1] = -1; OBJ_CONSTRUCT(&device->device_lock, opal_mutex_t); OBJ_CONSTRUCT(&device->send_free_control, opal_free_list_t); device->max_inline_data = 0; @@ -940,8 +916,8 @@ static void device_destruct(mca_btl_openib_device_t *device) { int i; -#if OPAL_ENABLE_PROGRESS_THREADS - if(device->progress) { +#if OPAL_ENABLE_PROGRESS_THREADS == 1 + if (device->progress) { device->progress = false; if (pthread_cancel(device->thread.t_handle)) { BTL_ERROR(("Failed to cancel OpenIB progress thread")); @@ -949,27 +925,15 @@ static void device_destruct(mca_btl_openib_device_t *device) } opal_thread_join(&device->thread, NULL); } + if (ibv_destroy_comp_channel(device->ib_channel)) { BTL_VERBOSE(("Failed to close comp_channel")); goto device_error; } #endif + /* signaling to async_tread to stop poll for this device */ - if (mca_btl_openib_component.use_async_event_thread && - -1 != mca_btl_openib_component.async_pipe[1]) { - mca_btl_openib_async_cmd_t async_command = {.a_cmd = OPENIB_ASYNC_CMD_FD_REMOVE, - .fd = device->ib_dev_context->async_fd, - .qp = NULL}; - if (write(mca_btl_openib_component.async_pipe[1], &async_command, - sizeof(mca_btl_openib_async_cmd_t)) < 0){ - BTL_ERROR(("Failed to write to pipe")); - goto device_error; - } - /* wait for ok from thread */ - if (OPAL_SUCCESS != btl_openib_async_command_done(device->ib_dev_context->async_fd)){ - goto device_error; - } - } + mca_btl_openib_async_rem_device (device); if(device->eager_rdma_buffers) { int i; @@ -2562,11 +2526,6 @@ btl_openib_component_init(int *num_btl_modules, goto no_btls; } - /* Initialize FD listening */ - if (OPAL_SUCCESS != opal_btl_openib_fd_init()) { - goto no_btls; - } - /* If we are using ptmalloc2 and there are no posix threads available, this will cause memory corruption. Refuse to run. Right now, ptmalloc2 is the only memory manager that we have on @@ -2741,7 +2700,7 @@ btl_openib_component_init(int *num_btl_modules, OBJ_CONSTRUCT(&btl_list, opal_list_t); OBJ_CONSTRUCT(&mca_btl_openib_component.ib_lock, opal_mutex_t); - mca_btl_openib_component.async_thread = 0; + distance = dev_sorted[0].distance; for (found = false, i = 0; i < num_devs && (-1 == mca_btl_openib_component.ib_max_btls || @@ -3925,3 +3884,42 @@ int mca_btl_openib_post_srr(mca_btl_openib_module_t* openib_btl, const int qp) return OPAL_ERROR; } + +struct mca_btl_openib_event_t { + opal_event_t super; + void *(*fn)(void *); + void *arg; + opal_event_t *event; +}; + +typedef struct mca_btl_openib_event_t mca_btl_openib_event_t; + +static void *mca_btl_openib_run_once_cb (int fd, int flags, void *context) +{ + mca_btl_openib_event_t *event = (mca_btl_openib_event_t *) context; + void *ret; + + ret = event->fn (event->arg); + opal_event_del (&event->super); + free (event); + return ret; +} + +int mca_btl_openib_run_in_main (void *(*fn)(void *), void *arg) +{ + mca_btl_openib_event_t *event = malloc (sizeof (mca_btl_openib_event_t)); + + if (OPAL_UNLIKELY(NULL == event)) { + return OPAL_ERR_OUT_OF_RESOURCE; + } + + event->fn = fn; + event->arg = arg; + + opal_event_set (opal_sync_event_base, &event->super, -1, OPAL_EV_READ, + mca_btl_openib_run_once_cb, event); + + opal_event_active (&event->super, OPAL_EV_READ, 1); + + return OPAL_SUCCESS; +} diff --git a/opal/mca/btl/openib/btl_openib_fd.c b/opal/mca/btl/openib/btl_openib_fd.c deleted file mode 100644 index 98a9e4799fa..00000000000 --- a/opal/mca/btl/openib/btl_openib_fd.c +++ /dev/null @@ -1,693 +0,0 @@ -/* - * Copyright (c) 2008-2013 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2009 Sandia National Laboratories. All rights reserved. - * Copyright (c) 2015 Research Organization for Information Science - * and Technology (RIST). All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -/** - * Note: this file is a little fast-n-loose -- - * it uses this value in run-time "if" conditionals (vs. compile-time - * #if conditionals). We also don't protect including . - * That's because this component currently only compiles on Linux and - * Solaris, and both of these OS's have pthreads. Using the run-time - * conditionals gives us better compile-time checking, even of code - * that isn't activated. - * - * Note, too, that the functionality in this file does *not* require - * all the heavyweight OMPI thread infrastructure (e.g., from - * --enable-mpi-thread-multiple or --enable-progress-threads). All work that - * is done in a separate progress thread is very carefully segregated - * from that of the main thread, and communication back to the main - * thread - */ - -#include "opal_config.h" - -#include -#include -#include -#include - -#include "opal/class/opal_list.h" -#include "opal/mca/event/event.h" -#include "opal/util/output.h" -#include "opal/util/fd.h" -#include "opal/threads/threads.h" - -#include "btl_openib_fd.h" - - -typedef union { - opal_btl_openib_fd_event_callback_fn_t *event; - opal_btl_openib_fd_main_callback_fn_t *main; -} callback_u_t; - -/* - * Data for each registered item - */ -typedef struct { - opal_list_item_t super; - bool ri_event_used; - opal_event_t ri_event; - int ri_fd; - int ri_flags; - callback_u_t ri_callback; - void *ri_context; -} registered_item_t; - -static OBJ_CLASS_INSTANCE(registered_item_t, opal_list_item_t, NULL, NULL); - -/* - * Command types - */ -typedef enum { - /* Read by service thread */ - CMD_TIME_TO_QUIT, - CMD_ADD_FD, - CMD_REMOVE_FD, - ACK_RAN_FUNCTION, - - /* Read by service and main threads */ - CMD_CALL_FUNCTION, - CMD_MAX -} cmd_type_t; - -/* - * Commands. Fields ordered to avoid memory holes (and valgrind warnings). - */ -typedef struct { - callback_u_t pc_fn; - void *pc_context; - int pc_fd; - int pc_flags; - cmd_type_t pc_cmd; - char end; -} cmd_t; - -/* - * Queued up list of commands to send to the main thread - */ -typedef struct { - opal_list_item_t super; - cmd_t cli_cmd; -} cmd_list_item_t; - -static OBJ_CLASS_INSTANCE(cmd_list_item_t, opal_list_item_t, NULL, NULL); - -static bool initialized = false; -static int cmd_size = 0; -static fd_set read_fds, write_fds; -static int max_fd; -static opal_list_t registered_items; - -/* These items are only used in the threaded version */ -/* Owned by the main thread */ -static pthread_t thread; -static opal_event_t main_thread_event; -static int pipe_to_service_thread[2] = { -1, -1 }; - -/* Owned by the service thread */ -static int pipe_to_main_thread[2] = { -1, -1 }; -static const size_t max_outstanding_to_main_thread = 32; -static size_t waiting_for_ack_from_main_thread = 0; -static opal_list_t pending_to_main_thread; - - -/* - * Write a command to the main thread, or queue it up if the pipe is full - */ -static int write_to_main_thread(cmd_t *cmd) -{ - /* Note that if we write too much to the main thread pipe and the - main thread doesn't check it often, we could fill up the pipe - and cause this thread to block. Bad! So we do some simple - counting here and ensure that we don't fill the pipe. If we - are in danger of that, then queue up the commands here in the - service thread. The main thread will ACK every CALL_FUNCTION - command, so we have a built-in mechanism to wake up the service - thread to drain any queued-up commands. */ - if (opal_list_get_size(&pending_to_main_thread) > 0 || - waiting_for_ack_from_main_thread >= max_outstanding_to_main_thread) { - cmd_list_item_t *cli = OBJ_NEW(cmd_list_item_t); - if (NULL == cli) { - return OPAL_ERR_OUT_OF_RESOURCE; - } - memcpy(&cli->cli_cmd, cmd, cmd_size); - opal_list_append(&pending_to_main_thread, &(cli->super)); - } else { - OPAL_OUTPUT((-1, "fd: writing to main thread")); - opal_fd_write(pipe_to_main_thread[1], cmd_size, cmd); - ++waiting_for_ack_from_main_thread; - } - - return OPAL_SUCCESS; -} - -static void service_fd_callback(int fd, short event, void *context) -{ - registered_item_t *ri = (registered_item_t*) context; - ri->ri_callback.event(fd, event, ri->ri_context); -} - - -/* - * Add an fd to the listening set - */ -static int service_pipe_cmd_add_fd(bool use_libevent, cmd_t *cmd) -{ - registered_item_t *ri = OBJ_NEW(registered_item_t); - if (NULL == ri) { - return OPAL_ERR_OUT_OF_RESOURCE; - } - ri->ri_event_used = false; - ri->ri_fd = cmd->pc_fd; - ri->ri_flags = cmd->pc_flags; - ri->ri_callback.event = cmd->pc_fn.event; - ri->ri_context = cmd->pc_context; - - if (use_libevent) { - /* Make an event for this fd */ - ri->ri_event_used = true; - opal_event_set(opal_sync_event_base, &ri->ri_event, ri->ri_fd, - ri->ri_flags | OPAL_EV_PERSIST, service_fd_callback, - ri); - opal_event_add(&ri->ri_event, 0); - } else { - /* Add the fd to the relevant fd local sets and update max_fd */ - if (OPAL_EV_READ & ri->ri_flags) { - FD_SET(ri->ri_fd, &read_fds); - } - if (OPAL_EV_WRITE & cmd->pc_flags) { - FD_SET(ri->ri_fd, &write_fds); - } - max_fd = (max_fd > ri->ri_fd) ? max_fd : ri->ri_fd + 1; - } - - opal_list_append(®istered_items, &ri->super); - return OPAL_SUCCESS; -} - -/* - * Run a function - */ -static int service_pipe_cmd_call_function(cmd_t *cmd) -{ - cmd_t local_cmd; - - OPAL_OUTPUT((-1, "fd service thread: calling function!")); - /* Call the function */ - if (NULL != cmd->pc_fn.main) { - cmd->pc_fn.main(cmd->pc_context); - } - - /* Now ACK that we ran the function */ - memset(&local_cmd, 0, cmd_size); - local_cmd.pc_cmd = ACK_RAN_FUNCTION; - opal_fd_write(pipe_to_main_thread[1], cmd_size, &local_cmd); - - /* Done */ - return OPAL_SUCCESS; -} - -/* - * Remove an fd from the listening set - */ -static int service_pipe_cmd_remove_fd(cmd_t *cmd) -{ - int i; - opal_list_item_t *item; - registered_item_t *ri; - - OPAL_OUTPUT((-1, "service thread got unmonitor fd %d", cmd->pc_fd)); - /* Go through the list of registered fd's and find the fd to - remove */ - for (item = opal_list_get_first(®istered_items); - NULL != opal_list_get_end(®istered_items); - item = opal_list_get_next(item)) { - ri = (registered_item_t*) item; - if (cmd->pc_fd == ri->ri_fd) { - /* Found it. The item knows if it was used as a libevent - event or an entry in the local fd sets. */ - if (ri->ri_event_used) { - /* Remove this event from libevent */ - opal_event_del(&ri->ri_event); - } else { - /* Remove this item from the fd_sets and recalculate - MAX_FD */ - FD_CLR(cmd->pc_fd, &read_fds); - FD_CLR(cmd->pc_fd, &write_fds); - for (max_fd = i = pipe_to_service_thread[0]; i < FD_SETSIZE; ++i) { - if (FD_ISSET(i, &read_fds) || FD_ISSET(i, &write_fds)) { - max_fd = i + 1; - } - } - } - - /* Let the caller know that we have stopped monitoring - this fd (if they care) */ - if (NULL != cmd->pc_fn.event) { - cmd->pc_fn.event(cmd->pc_fd, 0, cmd->pc_context); - } - - /* Remove this item from the list of registered items and - release it */ - opal_list_remove_item(®istered_items, item); - OBJ_RELEASE(item); - return OPAL_SUCCESS; - } - } - - /* This shouldn't happen */ - return OPAL_ERR_NOT_FOUND; -} - - -/* - * Call a function and ACK that we ran it - */ -static int main_pipe_cmd_call_function(cmd_t *cmd) -{ - cmd_t local_cmd; - - OPAL_OUTPUT((-1, "fd main thread: calling function!")); - /* Call the function */ - if (NULL != cmd->pc_fn.main) { - cmd->pc_fn.main(cmd->pc_context); - } - - /* Now ACK that we ran the function */ - memset(&local_cmd, 0, cmd_size); - local_cmd.pc_cmd = ACK_RAN_FUNCTION; - opal_fd_write(pipe_to_service_thread[1], cmd_size, &local_cmd); - - /* Done */ - return OPAL_SUCCESS; -} - - -/* - * Act on pipe commands - */ -static bool service_pipe_cmd(void) -{ - bool ret = false; - cmd_t cmd; - cmd_list_item_t *cli; - - opal_fd_read(pipe_to_service_thread[0], cmd_size, &cmd); - switch (cmd.pc_cmd) { - case CMD_ADD_FD: - OPAL_OUTPUT((-1, "fd service thread: CMD_ADD_FD")); - if (OPAL_SUCCESS != service_pipe_cmd_add_fd(false, &cmd)) { - ret = true; - } - break; - - case CMD_REMOVE_FD: - OPAL_OUTPUT((-1, "fd service thread: CMD_REMOVE_FD")); - if (OPAL_SUCCESS != service_pipe_cmd_remove_fd(&cmd)) { - ret = true; - } - break; - - case CMD_CALL_FUNCTION: - OPAL_OUTPUT((-1, "fd service thread: CMD_RUN_FUNCTION")); - if (OPAL_SUCCESS != service_pipe_cmd_call_function(&cmd)) { - ret = true; - } - break; - - case CMD_TIME_TO_QUIT: - OPAL_OUTPUT((-1, "fd service thread: CMD_TIME_TO_QUIT")); - ret = true; - break; - - case ACK_RAN_FUNCTION: - /* We don't have a guarantee that the main thread will check - its pipe frequently, so we do some simple counting to - ensure we just don't have too many outstanding commands to - the main thread at any given time. The main thread will - ACK every CALL_FUNCTION command, so this thread will always - wake up and continue to drain any queued up functions. */ - cli = (cmd_list_item_t*) opal_list_remove_first(&pending_to_main_thread); - if (NULL != cli) { - OPAL_OUTPUT((-1, "sending queued up cmd function to main thread")); - opal_fd_write(pipe_to_main_thread[1], cmd_size, &(cli->cli_cmd)); - OBJ_RELEASE(cli); - } else { - --waiting_for_ack_from_main_thread; - } - break; - - default: - OPAL_OUTPUT((-1, "fd service thread: unknown pipe command!")); - break; - } - - return ret; -} - - -/* - * Main thread logic - */ -static void *service_thread_start(void *context) -{ - int rc, flags; - fd_set read_fds_copy, write_fds_copy; - opal_list_item_t *item; - registered_item_t *ri; - - /* Make an fd set that we can select() on */ - FD_ZERO(&write_fds); - FD_ZERO(&read_fds); - FD_SET(pipe_to_service_thread[0], &read_fds); - max_fd = pipe_to_service_thread[0] + 1; - - OPAL_OUTPUT((-1, "fd service thread running")); - - /* Main loop waiting for commands over the fd's */ - while (1) { - memcpy(&read_fds_copy, &read_fds, sizeof(read_fds)); - memcpy(&write_fds_copy, &write_fds, sizeof(write_fds)); - OPAL_OUTPUT((-1, "fd service thread blocking on select...")); - rc = select(max_fd, &read_fds_copy, &write_fds_copy, NULL, NULL); - if (0 != rc && EAGAIN == errno) { - continue; - } - - OPAL_OUTPUT((-1, "fd service thread woke up!")); - - if (0 > rc) { - if (EBADF == errno) { - /* We are assuming we lost a socket so set rc to 1 so we'll - * try to read a command off the service pipe to receive a - * rm command (corresponding to the socket that went away). - * If the EBADF is from the service pipe then the error - * condition will be handled by the service_pipe_cmd(). - */ - OPAL_OUTPUT((-1,"fd service thread: non-EAGAIN from select %d", errno)); - rc = 1; - } - } - if (rc > 0) { - if (FD_ISSET(pipe_to_service_thread[0], &read_fds_copy)) { - OPAL_OUTPUT((-1, "fd service thread: pipe command")); - if (service_pipe_cmd()) { - break; - } - OPAL_OUTPUT((-1, "fd service thread: back from pipe command")); - /* Continue to the top of the loop to see if there are more - * commands on the pipe. This is done to reset the fds - * list just in case the last select incurred an EBADF. - * Please do not remove this continue thinking one is trying - * to enforce a fairness of reading the sockets or we'll - * end up with segv's below when select incurs an EBADF. - */ - continue; - } - - /* Go through all the registered events and see who had - activity */ - if (!opal_list_is_empty(®istered_items)) { - for (item = opal_list_get_first(®istered_items); - item != opal_list_get_end(®istered_items); - item = opal_list_get_next(item)) { - ri = (registered_item_t*) item; - flags = 0; - - /* See if this fd was ready for reading or writing - (fd's will only be in the read_fds or write_fds - set depending on what they registered for) */ - if (FD_ISSET(ri->ri_fd, &read_fds_copy)) { - flags |= OPAL_EV_READ; - } - if (FD_ISSET(ri->ri_fd, &write_fds_copy)) { - flags |= OPAL_EV_WRITE; - } - - /* If either was ready, invoke the callback */ - if (0 != flags) { - OPAL_OUTPUT((-1, "fd service thread: invoking callback for registered fd %d", ri->ri_fd)); - ri->ri_callback.event(ri->ri_fd, flags, - ri->ri_context); - OPAL_OUTPUT((-1, "fd service thread: back from callback for registered fd %d", ri->ri_fd)); - } - } - } - } - } - - /* All done */ - OPAL_OUTPUT((-1, "fd service thread: exiting")); - opal_atomic_wmb(); - return NULL; -} - - -static void main_thread_event_callback(int fd, short event, void *context) -{ - cmd_t cmd; - - OPAL_OUTPUT((-1, "main thread -- reading command")); - opal_fd_read(pipe_to_main_thread[0], cmd_size, &cmd); - switch (cmd.pc_cmd) { - case CMD_CALL_FUNCTION: - OPAL_OUTPUT((-1, "fd main thread: calling command")); - main_pipe_cmd_call_function(&cmd); - break; - - default: - OPAL_OUTPUT((-1, "fd main thread: unknown pipe command: %d", - cmd.pc_cmd)); - break; - } -} - -/****************************************************************** - * Main interface calls - ******************************************************************/ - -/* - * Initialize - * Called by main thread - */ -int opal_btl_openib_fd_init(void) -{ - if (!initialized) { - cmd_t bogus; - - OBJ_CONSTRUCT(®istered_items, opal_list_t); - - /* Calculate the real size of the cmd struct */ - cmd_size = (int) (&(bogus.end) - ((char*) &bogus)); - - OBJ_CONSTRUCT(&pending_to_main_thread, opal_list_t); - - /* Create pipes to communicate between the two threads */ - if (0 != pipe(pipe_to_service_thread)) { - return OPAL_ERR_IN_ERRNO; - } - if (0 != pipe(pipe_to_main_thread)) { - return OPAL_ERR_IN_ERRNO; - } - - /* Create a libevent event that is used in the main thread - to watch its pipe */ - opal_event_set(opal_sync_event_base, &main_thread_event, pipe_to_main_thread[0], - OPAL_EV_READ | OPAL_EV_PERSIST, - main_thread_event_callback, NULL); - opal_event_add(&main_thread_event, 0); - - /* Start the service thread */ - if (0 != pthread_create(&thread, NULL, service_thread_start, - NULL)) { - int errno_save = errno; - opal_event_del(&main_thread_event); - close(pipe_to_service_thread[0]); - close(pipe_to_service_thread[1]); - close(pipe_to_main_thread[0]); - close(pipe_to_main_thread[1]); - errno = errno_save; - return OPAL_ERR_IN_ERRNO; - } - - initialized = true; - } - return OPAL_SUCCESS; -} - - -/* - * Start monitoring an fd - * Called by main or service thread; callback will be in service thread - */ -int opal_btl_openib_fd_monitor(int fd, int flags, - opal_btl_openib_fd_event_callback_fn_t *callback, - void *context) -{ - cmd_t cmd; - - /* Sanity check */ - if (fd < 0 || 0 == flags || NULL == callback) { - return OPAL_ERR_BAD_PARAM; - } - - cmd.pc_cmd = CMD_ADD_FD; - cmd.pc_fd = fd; - cmd.pc_flags = flags; - cmd.pc_fn.event = callback; - cmd.pc_context = context; - /* For the threaded version, write a command down the pipe */ - OPAL_OUTPUT((-1, "main thread sending monitor fd %d", fd)); - opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd); - - return OPAL_SUCCESS; -} - - -/* - * Stop monitoring an fd - * Called by main or service thread; callback will be in service thread - */ -int opal_btl_openib_fd_unmonitor(int fd, - opal_btl_openib_fd_event_callback_fn_t *callback, - void *context) -{ - cmd_t cmd; - - /* Sanity check */ - if (fd < 0) { - return OPAL_ERR_BAD_PARAM; - } - - cmd.pc_cmd = CMD_REMOVE_FD; - cmd.pc_fd = fd; - cmd.pc_flags = 0; - cmd.pc_fn.event = callback; - cmd.pc_context = context; - /* For the threaded version, write a command down the pipe */ - OPAL_OUTPUT((-1, "main thread sending unmonitor fd %d", fd)); - opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd); - - return OPAL_SUCCESS; -} - -/* - * Run in the service thread - * Called by main thread; callback will be in service thread - */ -int opal_btl_openib_fd_run_in_service(opal_btl_openib_fd_main_callback_fn_t *callback, - void *context) -{ - cmd_t cmd; - - cmd.pc_cmd = CMD_CALL_FUNCTION; - cmd.pc_fd = -1; - cmd.pc_flags = 0; - cmd.pc_fn.main = callback; - cmd.pc_context = context; - /* For the threaded version, write a command down the pipe */ - OPAL_OUTPUT((-1, "main thread sending 'run in service'")); - opal_fd_write(pipe_to_service_thread[1], cmd_size, &cmd); - - return OPAL_SUCCESS; -} - -/* - * Run a function in the main thread - * Called by service thread - */ -int opal_btl_openib_fd_run_in_main(opal_btl_openib_fd_main_callback_fn_t *callback, - void *context) -{ - cmd_t cmd; - - OPAL_OUTPUT((-1, "run in main -- sending command")); - /* For the threaded version, write a command down the pipe */ - cmd.pc_cmd = CMD_CALL_FUNCTION; - cmd.pc_fd = -1; - cmd.pc_flags = 0; - cmd.pc_fn.main = callback; - cmd.pc_context = context; - write_to_main_thread(&cmd); - - return OPAL_SUCCESS; -} - - -int -opal_btl_openib_fd_main_thread_drain(void) -{ - int nfds, ret; - fd_set rfds; - struct timeval tv; - - while (1) { - FD_ZERO(&rfds); - FD_SET(pipe_to_main_thread[0], &rfds); - nfds = pipe_to_main_thread[0] + 1; - - tv.tv_sec = 0; - tv.tv_usec = 0; - - ret = select(nfds, &rfds, NULL, NULL, &tv); - if (ret > 0) { - main_thread_event_callback(pipe_to_main_thread[0], 0, NULL); - return 0; - } else { - return ret; - } - } -} - - -/* - * Finalize - * Called by main thread - */ -int opal_btl_openib_fd_finalize(void) -{ - if (initialized) { - /* For the threaded version, send a command down the pipe */ - cmd_t cmd; - OPAL_OUTPUT((-1, "shutting down openib fd")); - /* Check if the thread exists before asking it to quit */ - if (ESRCH != pthread_kill(thread, 0)) { - memset(&cmd, 0, cmd_size); - cmd.pc_cmd = CMD_TIME_TO_QUIT; - if (OPAL_SUCCESS != opal_fd_write(pipe_to_service_thread[1], - cmd_size, &cmd)) { - /* We cancel the thread if there's an error - * sending the "quit" cmd. This only ever happens on - * a "restart" which could result in dangling - * fds. OMPI must not rely on the checkpointer to - * save/restore any fds or connections - */ - pthread_cancel(thread); - } - - pthread_join(thread, NULL); - opal_atomic_rmb(); - } - - opal_event_del(&main_thread_event); - - close(pipe_to_service_thread[0]); - close(pipe_to_service_thread[1]); - close(pipe_to_main_thread[0]); - close(pipe_to_main_thread[1]); - OBJ_DESTRUCT(&pending_to_main_thread); - OBJ_DESTRUCT(®istered_items); - } - initialized = false; - - return OPAL_SUCCESS; -} diff --git a/opal/mca/btl/openib/btl_openib_fd.h b/opal/mca/btl/openib/btl_openib_fd.h deleted file mode 100644 index 6dc63a2468e..00000000000 --- a/opal/mca/btl/openib/btl_openib_fd.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2008 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2009 Sandia National Laboratories. All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#ifndef OPAL_BTL_OPENIB_FD_H_ -#define OPAL_BTL_OPENIB_FD_H_ - -#include "opal_config.h" - -BEGIN_C_DECLS - -/** - * Typedef for fd callback function - */ -typedef void *(opal_btl_openib_fd_event_callback_fn_t)(int fd, int flags, - void *context); - -/** - * Typedef for generic callback function - */ -typedef void *(opal_btl_openib_fd_main_callback_fn_t)(void *context); - -/** - * Initialize fd monitoring. - * Called by the main thread. - */ -int opal_btl_openib_fd_init(void); - -/** - * Start monitoring an fd. - * Called by main or service thread; callback will be in service thread. - */ -int opal_btl_openib_fd_monitor(int fd, int flags, - opal_btl_openib_fd_event_callback_fn_t *callback, - void *context); - -/** - * Stop monitoring an fd. - * Called by main or service thread; callback will be in service thread. - */ -int opal_btl_openib_fd_unmonitor(int fd, - opal_btl_openib_fd_event_callback_fn_t *callback, - void *context); - -/** - * Run a function in the service thread. - * Called by the main thread. - */ -int opal_btl_openib_fd_run_in_service(opal_btl_openib_fd_main_callback_fn_t callback, - void *context); - -/** - * Run a function in the main thread. - * Called by the service thread. - */ -int opal_btl_openib_fd_run_in_main(opal_btl_openib_fd_main_callback_fn_t callback, - void *context); - -/** - * Drain all pending messages from the main thread's pipe. - * Likely only useful during finalize, when the event library - * won't fire callbacks. - */ -int opal_btl_openib_fd_main_thread_drain(void); - -/** - * Finalize fd monitoring. - * Called by the main thread. - */ -int opal_btl_openib_fd_finalize(void); - -END_C_DECLS - -#endif /* OPAL_BTL_OPENIB_FD_H_ */ diff --git a/opal/mca/btl/openib/connect/btl_openib_connect_rdmacm.c b/opal/mca/btl/openib/connect/btl_openib_connect_rdmacm.c index c1df5159ee4..67a4fb2954d 100644 --- a/opal/mca/btl/openib/connect/btl_openib_connect_rdmacm.c +++ b/opal/mca/btl/openib/connect/btl_openib_connect_rdmacm.c @@ -1,10 +1,11 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2007-2013 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2007-2008 Chelsio, Inc. All rights reserved. * Copyright (c) 2008 Mellanox Technologies. All rights reserved. * Copyright (c) 2009 Sandia National Laboratories. All rights reserved. * Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved. - * Copyright (c) 2012-2013 Los Alamos National Security, LLC. All rights + * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved * Copyright (c) 2014 The University of Tennessee and The University @@ -54,8 +55,8 @@ #include "opal/util/error.h" #include "opal/util/show_help.h" #include "opal/util/proc.h" +#include "opal/runtime/opal_progress_threads.h" -#include "btl_openib_fd.h" #include "btl_openib_proc.h" #include "btl_openib_endpoint.h" #include "connect/connect.h" @@ -211,8 +212,12 @@ static uint32_t rdmacm_addr = 0; static int rdmacm_resolve_timeout = 30000; static int rdmacm_resolve_max_retry_count = 20; static bool rdmacm_reject_causes_connect_error = false; +static pthread_cond_t rdmacm_disconnect_cond; +static pthread_mutex_t rdmacm_disconnect_lock; static volatile int disconnect_callbacks = 0; static bool rdmacm_component_initialized = false; +static opal_event_base_t *rdmacm_event_base = NULL; +static opal_event_t rdmacm_event; /* Calculate the *real* length of the message (not aligned/rounded up) */ @@ -1011,7 +1016,7 @@ static int handle_connect_request(struct rdma_cm_event *event) ((struct sockaddr_in *)peeraddr)->sin_addr.s_addr; c->peer_tcp_port = rdma_get_dst_port(event->id); } - opal_btl_openib_fd_run_in_main(show_help_cant_find_endpoint, c); + show_help_cant_find_endpoint (c); #else BTL_ERROR(("Cannot find endpoint.")); #endif @@ -1160,19 +1165,6 @@ static int handle_connect_request(struct rdma_cm_event *event) return OPAL_ERROR; } -/* - * Invoked by service thread - */ -static void *rdmacm_unmonitor(int fd, int flags, void *context) -{ - volatile int *barrier = (volatile int *) context; - - OPAL_OUTPUT((-1, "SERVICE rdmacm unlocking main thread")); - *barrier = 1; - - return NULL; -} - /* * Runs in service thread * @@ -1181,24 +1173,36 @@ static void *rdmacm_unmonitor(int fd, int flags, void *context) * in the service thread while rdma_disconnect() is still running in * the main thread (which causes all manner of Bad Things to occur). */ -static void *call_disconnect_callback(void *v) +static void *call_disconnect_callback(int fd, int flags, void *v) { + rdmacm_contents_t *contents = (rdmacm_contents_t *) v; void *tmp = NULL; id_context_t *context = (id_context_t*) v; - OPAL_OUTPUT((-1, "SERVICE Service thread calling disconnect on ID %p", - (void*) context->id)); + opal_list_item_t *item; + + pthread_mutex_lock (&rdmacm_disconnect_lock); + while (NULL != (item = opal_list_remove_first(&contents->ids))) { + context = (id_context_t *) item; + + OPAL_OUTPUT((-1, "RDMACM Event thread calling disconnect on ID %p", + (void*) context->id)); + + if (!context->already_disconnected) { + tmp = context->id; + rdma_disconnect(context->id); + context->already_disconnected = true; + } + + OBJ_RELEASE(context); - if (!context->already_disconnected) { - tmp = context->id; - rdma_disconnect(context->id); - context->already_disconnected = true; + OPAL_OUTPUT((-1, "RDMACM Event thread disconnect on ID %p done", + (void*) tmp)); } - OBJ_RELEASE(context); /* Tell the main thread that we're done */ - (void)opal_atomic_add(&disconnect_callbacks, 1); - OPAL_OUTPUT((-1, "SERVICE Service thread disconnect on ID %p done; count=%d", - (void*) tmp, disconnect_callbacks)); + pthread_cond_signal(&rdmacm_disconnect_cond); + pthread_mutex_unlock(&rdmacm_disconnect_lock); + return NULL; } @@ -1212,8 +1216,8 @@ static void *call_disconnect_callback(void *v) */ static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint) { - int num_to_wait_for; - opal_list_item_t *item, *item2; + rdmacm_contents_t *contents; + opal_event_t event; BTL_VERBOSE(("Start disconnecting...")); OPAL_OUTPUT((-1, "MAIN Endpoint finalizing")); @@ -1232,35 +1236,28 @@ static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint) * main thread and service thread. */ opal_mutex_lock(&client_list_lock); - num_to_wait_for = disconnect_callbacks = 0; - for (item = opal_list_get_first(&client_list); - item != opal_list_get_end(&client_list); - item = opal_list_get_next(item)) { - rdmacm_contents_t *contents = (rdmacm_contents_t *) item; - + OPAL_LIST_FOREACH(contents, &client_list, rdmacm_contents_t) { if (endpoint == contents->endpoint) { - while (NULL != - (item2 = opal_list_remove_first(&(contents->ids)))) { - /* Fun race condition: we cannot call - rdma_disconnect() here in the main thread, because - if we do, there is a nonzero chance that the - DISCONNECT event will be delivered and get executed - in the service thread immediately. If this all - happens before rdma_disconnect() returns, all - manner of Bad Things can/will occur. So just - invoke rdma_disconnect() in the service thread - where we guarantee that we won't be processing an - event when it is called. */ - OPAL_OUTPUT((-1, "MAIN Main thread calling disconnect on ID %p", - (void*) ((id_context_t*) item2)->id)); - ++num_to_wait_for; - opal_btl_openib_fd_run_in_service(call_disconnect_callback, - item2); - } + opal_list_remove_item(&client_list, (opal_list_item_t *) contents); + contents->on_client_list = false; + + /* Fun race condition: we cannot call + rdma_disconnect() in this thread, because + if we do, there is a nonzero chance that the + DISCONNECT event will be delivered and get executed + in the rdcm event thread immediately. If this all + happens before rdma_disconnect() returns, all + manner of Bad Things can/will occur. So just + invoke rdma_disconnect() in the rdmacm event thread + where we guarantee that we won't be processing an + event when it is called. */ + + opal_event_set (rdmacm_event_base, &event, -1, OPAL_EV_READ, + call_disconnect_callback, contents); + opal_event_active (&event, OPAL_EV_READ, 1); + /* remove_item returns the item before the item removed, meaning that the for list is still safe */ - item = opal_list_remove_item(&client_list, item); - contents->on_client_list = false; break; } } @@ -1270,10 +1267,11 @@ static int rdmacm_endpoint_finalize(struct mca_btl_base_endpoint_t *endpoint) opal_mutex_unlock(&client_list_lock); /* Now wait for all the disconnect callbacks to occur */ - while (num_to_wait_for != disconnect_callbacks) { - opal_btl_openib_fd_main_thread_drain(); - sched_yield(); + pthread_mutex_lock(&rdmacm_disconnect_lock); + while (opal_list_get_size (&contents->ids)) { + pthread_cond_wait (&rdmacm_disconnect_cond, &rdmacm_disconnect_lock); } + pthread_mutex_unlock(&rdmacm_disconnect_lock); OPAL_OUTPUT((-1, "MAIN Endpoint finished finalizing")); return OPAL_SUCCESS; @@ -1355,7 +1353,7 @@ static int rdmacm_connect_endpoint(id_context_t *context, /* Ensure that all the writes back to the endpoint and associated data structures have completed */ opal_atomic_wmb(); - opal_btl_openib_fd_run_in_main(local_endpoint_cpc_complete, endpoint); + mca_btl_openib_run_in_main (local_endpoint_cpc_complete, endpoint); return OPAL_SUCCESS; } @@ -1668,9 +1666,8 @@ static int finish_connect(id_context_t *context) /* * Runs in main thread */ -static void *show_help_rdmacm_event_error(void *c) +static void *show_help_rdmacm_event_error (struct rdma_cm_event *event) { - struct rdma_cm_event *event = (struct rdma_cm_event*) c; id_context_t *context = (id_context_t*) event->id->context; if (RDMA_CM_EVENT_DEVICE_REMOVAL == event->event) { @@ -1802,7 +1799,7 @@ static int event_handler(struct rdma_cm_event *event) case RDMA_CM_EVENT_CONNECT_RESPONSE: case RDMA_CM_EVENT_ADDR_ERROR: case RDMA_CM_EVENT_DEVICE_REMOVAL: - opal_btl_openib_fd_run_in_main(show_help_rdmacm_event_error, event); + show_help_rdmacm_event_error (event); rc = OPAL_ERROR; break; @@ -1817,7 +1814,7 @@ static int event_handler(struct rdma_cm_event *event) rc = resolve_route(context); break; } - opal_btl_openib_fd_run_in_main(show_help_rdmacm_event_error, event); + show_help_rdmacm_event_error (event); rc = OPAL_ERROR; break; @@ -1833,7 +1830,7 @@ static int event_handler(struct rdma_cm_event *event) } /* - * Runs in service thread + * Runs in event thread */ static inline void rdmamcm_event_error(struct rdma_cm_event *event) { @@ -1843,12 +1840,12 @@ static inline void rdmamcm_event_error(struct rdma_cm_event *event) endpoint = ((id_context_t *)event->id->context)->contents->endpoint; } - opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, - endpoint); + mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, + endpoint); } /* - * Runs in service thread + * Runs in event thread */ static void *rdmacm_event_dispatch(int fd, int flags, void *context) { @@ -2051,6 +2048,11 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, opal_btl_ rc = OPAL_ERR_NOT_SUPPORTED; goto out; } + if (!BTL_OPENIB_QP_TYPE_PP(0)) { + BTL_VERBOSE(("rdmacm CPC only supported when the first QP is a PP QP; skipped")); + rc = OPAL_ERR_NOT_SUPPORTED; + goto out; + } BTL_VERBOSE(("rdmacm_component_query")); @@ -2072,6 +2074,7 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, opal_btl_ selected if QP 0 is PP */ (*cpc)->cbm_uses_cts = true; + /* Start monitoring the fd associated with the cm_device */ server = OBJ_NEW(rdmacm_contents_t); if (NULL == server) { rc = OPAL_ERR_OUT_OF_RESOURCE; @@ -2220,9 +2223,7 @@ static int rdmacm_component_query(mca_btl_openib_module_t *openib_btl, opal_btl_ */ static int rdmacm_component_finalize(void) { - volatile int barrier = 0; opal_list_item_t *item, *item2; - int rc; BTL_VERBOSE(("rdmacm_component_finalize")); @@ -2232,36 +2233,20 @@ static int rdmacm_component_finalize(void) return OPAL_SUCCESS; } - if (NULL != event_channel) { - rc = opal_btl_openib_fd_unmonitor(event_channel->fd, - rdmacm_unmonitor, (void*) &barrier); - if (OPAL_SUCCESS != rc) { - BTL_ERROR(("Error disabling fd monitor")); - } - - /* Wait for the service thread to stop monitoring the fd */ - OPAL_OUTPUT((-1, "MAIN rdmacm_component_finalize: waiting for thread to finish")); - while (0 == barrier) { - sched_yield(); - } - OPAL_OUTPUT((-1, "MAIN rdmacm_component_finalize: thread finished")); + if (rdmacm_event_base) { + opal_event_del (&rdmacm_event); + opal_progress_thread_finalize (NULL); + rdmacm_event_base = NULL; } - /* The service thread is no longer running; no need to lock access + /* The event thread is no longer running; no need to lock access to the client_list */ - for (item = opal_list_remove_first(&client_list); - NULL != item; - item = opal_list_remove_first(&client_list)) { - OBJ_RELEASE(item); - } - OBJ_DESTRUCT(&client_list); + OPAL_LIST_DESTRUCT(&client_list); /* For each of the items in the server list, there's only one item in the "ids" list -- the server listener. So explicitly destroy its RDMA ID context. */ - for (item = opal_list_remove_first(&server_listener_list); - NULL != item; - item = opal_list_remove_first(&server_listener_list)) { + while (NULL != (item = opal_list_remove_first(&server_listener_list))) { rdmacm_contents_t *contents = (rdmacm_contents_t*) item; item2 = opal_list_remove_first(&(contents->ids)); OBJ_RELEASE(item2); @@ -2277,6 +2262,9 @@ static int rdmacm_component_finalize(void) mca_btl_openib_free_rdma_addr_list(); + pthread_cond_destroy (&rdmacm_disconnect_cond); + pthread_mutex_destroy (&rdmacm_disconnect_lock); + return OPAL_SUCCESS; } @@ -2326,10 +2314,22 @@ static int rdmacm_component_init(void) return OPAL_ERR_UNREACH; } - /* Start monitoring the fd associated with the cm_device */ - opal_btl_openib_fd_monitor(event_channel->fd, OPAL_EV_READ, - rdmacm_event_dispatch, NULL); + rdmacm_event_base = opal_progress_thread_init (NULL); + if (NULL == rdmacm_event_base) { + opal_output_verbose (5, opal_btl_base_framework.framework_output, + "openib BTL: could not create rdmacm event thread"); + return OPAL_ERR_UNREACH; + } + + opal_event_set (rdmacm_event_base, &rdmacm_event, event_channel->fd, + OPAL_EV_READ | OPAL_EV_PERSIST, rdmacm_event_dispatch, NULL); + + opal_event_add (&rdmacm_event, 0); + + pthread_cond_init (&rdmacm_disconnect_cond, NULL); + pthread_mutex_init (&rdmacm_disconnect_lock, NULL); rdmacm_component_initialized = true; + return OPAL_SUCCESS; } diff --git a/opal/mca/btl/openib/connect/btl_openib_connect_udcm.c b/opal/mca/btl/openib/connect/btl_openib_connect_udcm.c index ffa0fee8302..f2cde4b3a28 100644 --- a/opal/mca/btl/openib/connect/btl_openib_connect_udcm.c +++ b/opal/mca/btl/openib/connect/btl_openib_connect_udcm.c @@ -66,10 +66,10 @@ #include "opal/util/error.h" #include "opal/util/alfg.h" #include "opal_stdint.h" +#include "opal/class/opal_fifo.h" #include "btl_openib_endpoint.h" #include "btl_openib_proc.h" -#include "btl_openib_fd.h" #include "btl_openib_async.h" #include "connect/connect.h" @@ -149,9 +149,7 @@ typedef struct udcm_module { opal_mutex_t cm_send_lock; /* Receive queue */ - opal_mutex_t cm_recv_msg_queue_lock; - opal_list_t cm_recv_msg_queue; - bool cm_message_event_active; + opal_fifo_t cm_recv_msg_fifo; /* The associated BTL */ struct mca_btl_openib_module_t *btl; @@ -159,8 +157,20 @@ typedef struct udcm_module { /* This module's modex message */ modex_msg_t modex; - /** The channel is being monitored */ - bool channel_monitored; + /* channel monitoring */ + + /** channel event base */ + opal_event_base_t *channel_evbase; + + /** channel monitoring event */ + opal_event_t channel_event; + + /* message processing */ + /** mesage event is active */ + int32_t cm_message_event_active; + + /** message event */ + opal_event_t cm_message_event; } udcm_module_t; /* @@ -303,7 +313,7 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl, opal_btl_openib_connect_base_module_t *cpc); static void *udcm_cq_event_dispatch(int fd, int flags, void *context); -static void *udcm_message_callback (void *context); +static void *udcm_message_callback (int fd, int flags, void *context); static void udcm_set_message_timeout (udcm_message_sent_t *message); static void udcm_cancel_message_timeout (udcm_message_sent_t *message); @@ -660,8 +670,7 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl) OBJ_CONSTRUCT(&m->cm_lock, opal_mutex_t); OBJ_CONSTRUCT(&m->cm_send_lock, opal_mutex_t); - OBJ_CONSTRUCT(&m->cm_recv_msg_queue, opal_list_t); - OBJ_CONSTRUCT(&m->cm_recv_msg_queue_lock, opal_mutex_t); + OBJ_CONSTRUCT(&m->cm_recv_msg_fifo, opal_fifo_t); OBJ_CONSTRUCT(&m->flying_messages, opal_list_t); OBJ_CONSTRUCT(&m->cm_timeout_lock, opal_mutex_t); @@ -733,15 +742,23 @@ static int udcm_module_init (udcm_module_t *m, mca_btl_openib_module_t *btl) m->cm_exiting = false; /* Monitor the fd associated with the completion channel */ - opal_btl_openib_fd_monitor(m->cm_channel->fd, OPAL_EV_READ, - udcm_cq_event_dispatch, m); - m->channel_monitored = true; + m->channel_evbase = opal_progress_thread_init (NULL); + + opal_event_set (m->channel_evbase, &m->channel_event, + m->cm_channel->fd, OPAL_EV_READ | OPAL_EV_PERSIST, + udcm_cq_event_dispatch, m); + + opal_event_add (&m->channel_event, 0); udcm_timeout_tv.tv_sec = udcm_timeout / 1000000; udcm_timeout_tv.tv_usec = udcm_timeout - 1000000 * udcm_timeout_tv.tv_sec; - m->cm_message_event_active = false; + m->cm_message_event_active = 0; + + /* set up the message event */ + opal_event_set (opal_sync_event_base, &m->cm_message_event, -1, + OPAL_EV_READ, udcm_message_callback, m); /* Finally, request CQ notification */ if (0 != ibv_req_notify_cq (m->cm_recv_cq, 0)) { @@ -804,21 +821,11 @@ udcm_module_start_connect(opal_btl_openib_connect_base_module_t *cpc, return rc; } -static void *udcm_unmonitor(int fd, int flags, void *context) -{ - volatile int *barrier = (volatile int *)context; - - *barrier = 1; - - return NULL; -} - static int udcm_module_finalize(mca_btl_openib_module_t *btl, opal_btl_openib_connect_base_module_t *cpc) { udcm_module_t *m = (udcm_module_t *) cpc; opal_list_item_t *item; - volatile int barrier = 0; if (NULL == m) { return OPAL_SUCCESS; @@ -826,27 +833,19 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl, m->cm_exiting = true; - if (m->channel_monitored) { - /* stop monitoring the channel's fd before destroying the listen qp */ - opal_btl_openib_fd_unmonitor(m->cm_channel->fd, udcm_unmonitor, (void *)&barrier); - - while (0 == barrier) { - sched_yield(); - } + if (m->channel_evbase) { + opal_event_del (&m->channel_event); + opal_progress_thread_finalize (NULL); } opal_mutex_lock (&m->cm_lock); - opal_mutex_lock (&m->cm_recv_msg_queue_lock); - /* clear message queue */ - while ((item = opal_list_remove_first(&m->cm_recv_msg_queue))) { + while (NULL != (item = opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) { OBJ_RELEASE(item); } - opal_mutex_unlock (&m->cm_recv_msg_queue_lock); - - OBJ_DESTRUCT(&m->cm_recv_msg_queue); + OBJ_DESTRUCT(&m->cm_recv_msg_fifo); opal_mutex_lock (&m->cm_timeout_lock); while ((item = opal_list_remove_first(&m->flying_messages))) { @@ -890,7 +889,6 @@ static int udcm_module_finalize(mca_btl_openib_module_t *btl, opal_mutex_unlock (&m->cm_lock); OBJ_DESTRUCT(&m->cm_send_lock); OBJ_DESTRUCT(&m->cm_lock); - OBJ_DESTRUCT(&m->cm_recv_msg_queue_lock); OBJ_DESTRUCT(&m->cm_timeout_lock); return OPAL_SUCCESS; @@ -979,24 +977,7 @@ static void udcm_module_destroy_listen_qp (udcm_module_t *m) return; } - if (mca_btl_openib_component.use_async_event_thread && - -1 != mca_btl_openib_component.async_pipe[1]) { - /* Tell the openib async thread to ignore ERR state on the QP - we are about to manually set the ERR state on */ - mca_btl_openib_async_cmd_t async_command; - async_command.a_cmd = OPENIB_ASYNC_IGNORE_QP_ERR; - async_command.qp = m->listen_qp; - if (write(mca_btl_openib_component.async_pipe[1], - &async_command, sizeof(mca_btl_openib_async_cmd_t))<0){ - BTL_ERROR(("Failed to write to pipe [%d]",errno)); - return; - } - /* wait for ok from thread */ - if (OPAL_SUCCESS != - btl_openib_async_command_done(OPENIB_ASYNC_IGNORE_QP_ERR)) { - BTL_ERROR(("Command to openib async thread to ignore QP ERR state failed")); - } - } + mca_btl_openib_async_add_qp_ignore (m->listen_qp); do { /* Move listen QP into the ERR state to cancel all outstanding @@ -2078,9 +2059,7 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m) /* Copy just the message header */ memcpy (&item->msg_hdr, &message->hdr, sizeof (message->hdr)); - opal_mutex_lock(&m->cm_recv_msg_queue_lock); - opal_list_append (&m->cm_recv_msg_queue, &item->super); - opal_mutex_unlock(&m->cm_recv_msg_queue_lock); + opal_fifo_push_atomic (&m->cm_recv_msg_fifo, &item->super); udcm_send_ack (lcl_ep, message->hdr.rem_ctx); @@ -2088,13 +2067,9 @@ static int udcm_process_messages (struct ibv_cq *event_cq, udcm_module_t *m) udcm_module_post_one_recv (m, msg_num); } - opal_mutex_lock (&m->cm_recv_msg_queue_lock); - if (opal_list_get_size (&m->cm_recv_msg_queue) && - !m->cm_message_event_active) { - m->cm_message_event_active = true; - opal_btl_openib_fd_run_in_main (udcm_message_callback, (void *) m); + if (0 == opal_atomic_swap_32 (&m->cm_message_event_active, 1)) { + opal_event_active (&m->cm_message_event, OPAL_EV_READ, 1); } - opal_mutex_unlock (&m->cm_recv_msg_queue_lock); return count; } @@ -2142,18 +2117,15 @@ static void *udcm_cq_event_dispatch(int fd, int flags, void *context) return NULL; } -static void *udcm_message_callback (void *context) +static void *udcm_message_callback (int fd, int flags, void *context) { udcm_module_t *m = (udcm_module_t *) context; udcm_message_recv_t *item; BTL_VERBOSE(("running message thread")); - opal_mutex_lock(&m->cm_recv_msg_queue_lock); - while ((item = (udcm_message_recv_t *) - opal_list_remove_first (&m->cm_recv_msg_queue))) { + while ((item = (udcm_message_recv_t *) opal_fifo_pop_atomic (&m->cm_recv_msg_fifo))) { mca_btl_openib_endpoint_t *lcl_ep = item->msg_hdr.lcl_ep; - opal_mutex_unlock(&m->cm_recv_msg_queue_lock); OPAL_THREAD_LOCK(&lcl_ep->endpoint_lock); @@ -2189,14 +2161,11 @@ static void *udcm_message_callback (void *context) } OBJ_RELEASE (item); - - opal_mutex_lock(&m->cm_recv_msg_queue_lock); } BTL_VERBOSE(("exiting message thread")); - m->cm_message_event_active = false; - opal_mutex_unlock(&m->cm_recv_msg_queue_lock); + opal_atomic_swap_32 (&m->cm_message_event_active, 0); return NULL; } @@ -2262,9 +2231,9 @@ static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg) UDCM_ENDPOINT_REM_MODEX(lcl_ep)->mm_qp_num); /* We are running in the timeout thread. Invoke the error in the - main thread */ - opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, - lcl_ep); + * "main thread" because it may call up into the pml or another + * component that may not have threading support enabled. */ + mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep); break; } @@ -2274,8 +2243,7 @@ static void udcm_send_timeout (evutil_socket_t fd, short event, void *arg) if (0 != udcm_post_send (lcl_ep, msg->data, msg->length, 0)) { BTL_VERBOSE(("error reposting message")); - opal_btl_openib_fd_run_in_main(mca_btl_openib_endpoint_invoke_error, - lcl_ep); + mca_btl_openib_run_in_main (mca_btl_openib_endpoint_invoke_error, lcl_ep); break; } } while (0);