From ea35e472284a2b3a44383b86744367de2eeb01c8 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Fri, 29 May 2015 14:28:26 -0700 Subject: [PATCH] Fat SMPs (i.e., systems with nodes containing large numbers of cpus) were failing to start due to connection failures of the opal/pmix support. Root cause was that (a) we were setting the client socket to non-blocking before calling connect, and (b) the server was using the event library to harvest the accepts, and also did the handshake while in that event. So the server would backup beyond the connection backlog limit, and we would fail. Changing the client to leave its socket as blocking during the connect doesn't solve the problem by itself - you also have to introduce a sleep delay once the backlog is hit to avoid simply machine-gunning your way thru retries. This gets somewhat difficult to adjust as you don't want to unnecessarily prolong startup time. We've solved this before by adding a listening thread that simply reaps accepts and shoves them into the event library for subsequent processing. This would resolve the problem, but meant yet another daemon-level thread. So I centralized the listening thread support and let multiple elements register listeners on it. Thus, each daemon now has a single listening thread that reaps accepts from multiple sources - for now, the orte/pmix server and the oob/usock support are using it. I'll add in the oob/tcp component later. This still didn't fully resolve the SMP problem, especially on coprocessor cards (e.g., KNC). Removing the shared memory dstore support helped further improve the behavior - it looks like there is some kind of memory paging issue there that needs further understanding. Given that the shared memory support was about to be lost when I bring over the PMIx integration (until it is restored in that library), it seemed like a reasonable thing to just remove it at this point. --- opal/mca/dstore/base/dstore_base_frame.c | 3 +- opal/mca/dstore/dstore.h | 3 - opal/mca/dstore/sm/Makefile.am | 36 -- opal/mca/dstore/sm/dstore_sm.c | 427 ------------------- opal/mca/dstore/sm/dstore_sm.h | 49 --- opal/mca/dstore/sm/dstore_sm_component.c | 176 -------- opal/mca/dstore/sm/owner.txt | 7 - opal/mca/pmix/native/pmix_native.c | 308 ++++--------- opal/mca/pmix/native/pmix_native_component.c | 16 +- opal/mca/pmix/native/usock.c | 175 ++++---- orte/mca/odls/default/odls_default_module.c | 5 - orte/mca/oob/usock/Makefile.am | 4 +- orte/mca/oob/usock/oob_usock_component.c | 28 +- orte/mca/oob/usock/oob_usock_component.h | 4 - orte/orted/pmix/pmix_server.c | 391 ++--------------- orte/orted/pmix/pmix_server.h | 4 +- orte/orted/pmix/pmix_server_process_msgs.c | 384 ++++------------- orte/runtime/orte_finalize.c | 9 +- orte/runtime/orte_init.c | 21 +- orte/util/Makefile.am | 8 +- orte/util/listener.c | 381 +++++++++++++++++ orte/util/listener.h | 66 +++ 22 files changed, 787 insertions(+), 1718 deletions(-) delete mode 100644 opal/mca/dstore/sm/Makefile.am delete mode 100644 opal/mca/dstore/sm/dstore_sm.c delete mode 100644 opal/mca/dstore/sm/dstore_sm.h delete mode 100644 opal/mca/dstore/sm/dstore_sm_component.c delete mode 100644 opal/mca/dstore/sm/owner.txt create mode 100644 orte/util/listener.c create mode 100644 orte/util/listener.h diff --git a/opal/mca/dstore/base/dstore_base_frame.c b/opal/mca/dstore/base/dstore_base_frame.c index 6294c2608a6..47d2db5313e 100644 --- a/opal/mca/dstore/base/dstore_base_frame.c +++ b/opal/mca/dstore/base/dstore_base_frame.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -43,7 +43,6 @@ opal_dstore_base_API_t opal_dstore = { opal_dstore_base_t opal_dstore_base = {0}; int opal_dstore_internal = -1; -int opal_dstore_modex = -1; static int opal_dstore_base_frame_close(void) { diff --git a/opal/mca/dstore/dstore.h b/opal/mca/dstore/dstore.h index 30245f94d66..198a56b4ed3 100644 --- a/opal/mca/dstore/dstore.h +++ b/opal/mca/dstore/dstore.h @@ -43,9 +43,6 @@ BEGIN_C_DECLS * as someone figures out how to separate the various * datastore channels */ -OPAL_DECLSPEC extern int opal_dstore_internal; -OPAL_DECLSPEC extern int opal_dstore_modex; - OPAL_DECLSPEC extern int opal_dstore_peer; OPAL_DECLSPEC extern int opal_dstore_internal; OPAL_DECLSPEC extern int opal_dstore_nonpeer; diff --git a/opal/mca/dstore/sm/Makefile.am b/opal/mca/dstore/sm/Makefile.am deleted file mode 100644 index e582d69b923..00000000000 --- a/opal/mca/dstore/sm/Makefile.am +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (c) 2014 Mellanox Technologies, Inc. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -sources = \ - dstore_sm.h \ - dstore_sm_component.c \ - dstore_sm.c - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_opal_dstore_sm_DSO -component_noinst = -component_install = mca_dstore_sm.la -else -component_noinst = libmca_dstore_sm.la -component_install = -endif - -mcacomponentdir = $(opallibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_dstore_sm_la_SOURCES = $(sources) -mca_dstore_sm_la_LDFLAGS = -module -avoid-version -mca_dstore_sm_la_LIBADD = $(dstore_sm_LIBS) - -noinst_LTLIBRARIES = $(component_noinst) -libmca_dstore_sm_la_SOURCES =$(sources) -libmca_dstore_sm_la_LDFLAGS = -module -avoid-version diff --git a/opal/mca/dstore/sm/dstore_sm.c b/opal/mca/dstore/sm/dstore_sm.c deleted file mode 100644 index e62d1738c7f..00000000000 --- a/opal/mca/dstore/sm/dstore_sm.c +++ /dev/null @@ -1,427 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science - * and Technology (RIST). All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - * - */ - -#include "opal_config.h" -#include "opal/constants.h" - -#include -#include - -#include "opal_stdint.h" -#include "opal/dss/dss_types.h" -#include "opal/util/error.h" -#include "opal/util/output.h" -#include "opal/util/show_help.h" - -#include "opal/mca/dstore/base/base.h" -#include "dstore_sm.h" -#include "opal/mca/pmix/pmix.h" -#include "opal/mca/shmem/base/base.h" - -static uint32_t cur_offset = 0; -static int32_t cur_seg_index = -1; - -static int init(struct opal_dstore_base_module_t *imod); -static void finalize(struct opal_dstore_base_module_t *imod); -static int store(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, - opal_value_t *val); -static int fetch(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, - const char *key, - opal_list_t *kvs); -static int remove_data(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, const char *key); - -static void smtrkcon(opal_sm_tracker_t *p) -{ - p->jobid = 0; - p->addr = NULL; -} -static void smtrkdes(opal_sm_tracker_t *p) -{ -} - -OBJ_CLASS_INSTANCE(opal_sm_tracker_t, - opal_list_item_t, - smtrkcon, smtrkdes); - -#define SHARED_SEGMENT_SIZE (1<<22) - -mca_dstore_sm_module_t opal_dstore_sm_module = { - { - init, - finalize, - store, - fetch, - remove_data - } -}; - -segment_info *segments = NULL; -static int max_segment_num; - -/* Initialize our sm region */ -static int init(struct opal_dstore_base_module_t *imod) -{ - int i; - mca_dstore_sm_module_t *mod; - mod = (mca_dstore_sm_module_t*)imod; - - max_segment_num = META_OFFSET/sizeof(seg_info_short); - segments = malloc(max_segment_num * sizeof(segment_info)); - for (i = 0; i < max_segment_num; i++) { - segments[i].addr = NULL; - segments[i].seg_ds = NULL; - } - OBJ_CONSTRUCT(&mod->tracklist, opal_list_t); - return OPAL_SUCCESS; -} - -static void finalize(struct opal_dstore_base_module_t *imod) -{ - mca_dstore_sm_module_t *mod; - opal_sm_tracker_t *trk; - opal_list_item_t *item; - - mod = (mca_dstore_sm_module_t*)imod; - - int i; - for (i = 0; i < max_segment_num; i++) { - if (NULL != segments[i].seg_ds) { - if (segments[i].seg_ds->seg_cpid == getpid()) { - opal_shmem_unlink (segments[i].seg_ds); - } - opal_shmem_segment_detach (segments[i].seg_ds); - free(segments[i].seg_ds); - } - } - free(segments); - - /* release tracker object */ - for (item = opal_list_remove_first(&mod->tracklist); - NULL != item; - item = opal_list_remove_first(&mod->tracklist)) { - trk = (opal_sm_tracker_t*) item; - opal_shmem_segment_detach (&trk->seg_ds); - if (trk->seg_ds.seg_cpid == getpid()) { - opal_shmem_unlink (&trk->seg_ds); - } - OBJ_RELEASE(trk); - } - OPAL_LIST_DESTRUCT(&mod->tracklist); -} - - - -static int store(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, - opal_value_t *val) -{ - mca_dstore_sm_module_t *mod; - void *addr; - int32_t data_size; - opal_shmem_ds_t *seg_ds; - meta_info my_info; - seg_info_short sinfo; - char* seg_addr; - char *sm_file = NULL; - char *ch, *path; - int idx; - opal_sm_tracker_t *trk; - bool found_trk = false; - if (OPAL_BYTE_OBJECT != val->type) { - return OPAL_ERROR; - } - mod = (mca_dstore_sm_module_t*)imod; - data_size = val->data.bo.size; - - idx = uid->vpid; - /* look for segment info for target jobid */ - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == uid->jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: tracker object wasn't found for job id %u, proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - uid->jobid, - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - /* look for data for this process in meta_info segment */ - addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); - memcpy(&my_info, addr, sizeof(meta_info)); - if (0 < my_info.data_size && 0 <= my_info.seg_index) { - /* we should replace existing data for this process - * by new ones */ - if (my_info.data_size >= data_size) { - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: replace existing data for proc %s be the new ones", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - /* we can just simply replace the old data with the new ones */ - /* get existing segment from the list */ - seg_addr = segments[my_info.seg_index].addr; - seg_ds = segments[my_info.seg_index].seg_ds; - /* store data in this segment */ - addr = seg_addr + my_info.offset; - memset((uint8_t*)addr, 0, my_info.data_size); - memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); - /* update information about data size in meta info segment */ - my_info.data_size = data_size; - memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); - return OPAL_SUCCESS; - } - } - /* there is no data for this process, or there is data for new process - * but their size is smaller than the size of new data, so - * store them in the separate slot*/ - - /* store in another segment */ - if (0 > cur_seg_index || (cur_offset + data_size) > SHARED_SEGMENT_SIZE) { - if (max_segment_num == cur_seg_index+1) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: exceeded limit on number of segments %d. This value is managed by META_OFFSET macro.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - max_segment_num); - return OPAL_ERROR; - } - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: create new segment to store data for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - /* create new segment, attach to it and add it to the list of segments */ - cur_seg_index++; - cur_offset = 0; - if (0 < strlen(trk->seg_ds.seg_name)) { - path = strdup(trk->seg_ds.seg_name); - ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; - if (NULL != ch) { - *ch = '\0'; - (void)asprintf(&sm_file, "%sdstore_segment.%d", path, cur_seg_index); - } - free(path); - } - if (NULL == sm_file) { - (void)asprintf(&sm_file, "%s", "noname"); - } - if (NULL != sm_file) { - seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); - memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); - if (OPAL_SUCCESS != opal_shmem_segment_create (seg_ds, sm_file, SHARED_SEGMENT_SIZE)) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: couldn't create new shared segment to store key %s on proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - (NULL == val->key) ? "NULL" : val->key, - OPAL_NAME_PRINT(*uid)); - free(seg_ds); - if (NULL != sm_file) { - free (sm_file); - } - return OPAL_ERROR; - } - if (NULL != sm_file) { - free (sm_file); - } - } else { - return OPAL_ERROR; - } - seg_addr = opal_shmem_segment_attach (seg_ds); - if (NULL == seg_addr) { - opal_shmem_unlink (seg_ds); - free(seg_ds); - return OPAL_ERROR; - } - segments[cur_seg_index].seg_ds = seg_ds; - segments[cur_seg_index].addr = seg_addr; - /* store information about new created segment in header section. */ - sinfo.seg_cpid = seg_ds->seg_cpid; - sinfo.seg_id = seg_ds->seg_id; - sinfo.seg_size = seg_ds->seg_size; - if (0 < strlen(seg_ds->seg_name)) { - ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; - memcpy(sinfo.file_name, ch, strlen(ch)+1); - } else { - memcpy(sinfo.file_name, "noname", strlen("noname")+1); - } - memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); - } else { - /* get existing segment from the array */ - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: getting current segment info to store data for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - seg_addr = segments[cur_seg_index].addr; - seg_ds = segments[cur_seg_index].seg_ds; - memcpy(&sinfo, (uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); - if (sinfo.seg_cpid != seg_ds->seg_cpid) { - /* store information about new created segment in header section. */ - sinfo.seg_cpid = seg_ds->seg_cpid; - sinfo.seg_id = seg_ds->seg_id; - sinfo.seg_size = seg_ds->seg_size; - if (0 < strlen(seg_ds->seg_name)) { - ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; - memcpy(sinfo.file_name, ch, strlen(ch)+1); - } else { - memcpy(sinfo.file_name, "noname", strlen("noname")+1); - } - memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); - } - } - /* store data in this segment */ - addr = seg_addr + cur_offset; - memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); - - /* store segment index and offset for this process - * in meta info segment. */ - my_info.seg_index = cur_seg_index; - my_info.offset = cur_offset; - my_info.data_size = data_size; - memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); - cur_offset += data_size; - - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: data for proc %s stored successfully", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - return OPAL_SUCCESS; -} - -static int fetch(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, - const char *key, opal_list_t *kvs) -{ - int rc; - int32_t size; - mca_dstore_sm_module_t *mod; - void *addr, *ptr; - opal_buffer_t *bptr, buf; - int32_t cnt; - opal_value_t *kp; - opal_shmem_ds_t *seg_ds; - meta_info my_info; - seg_info_short sinfo; - char* seg_addr; - int found = 0; - int32_t seg_index; - char *ch, *path; - opal_sm_tracker_t *trk; - bool found_trk = false; - int idx; - - mod = (mca_dstore_sm_module_t*)imod; - /* look for segment info for target jobid */ - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == uid->jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: tracker object wasn't found for job id %u, proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - uid->jobid, - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - /* look for data for this process in meta_info segment */ - idx = uid->vpid; - addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); - memcpy(&my_info, addr, sizeof(meta_info)); - if (0 == my_info.data_size) { - /* there is no data for this process */ - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: data for proc %s wasn't found.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - seg_index = my_info.seg_index; - /* look for this seg index in array of attached segments. - * If not found, attach to this segment and - * store it in the array. */ - if (NULL != segments[seg_index].addr) { - seg_addr = segments[seg_index].addr; - } else { - seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); - memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); - memcpy(&sinfo, (uint8_t*)trk->addr + seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); - seg_ds->seg_cpid = sinfo.seg_cpid; - seg_ds->seg_id = sinfo.seg_id; - seg_ds->seg_size = sinfo.seg_size; - if (0 < strlen(trk->seg_ds.seg_name)) { - path = strdup(trk->seg_ds.seg_name); - ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; - if (NULL != ch) { - *ch = '\0'; - sprintf(seg_ds->seg_name, "%s%s", path, sinfo.file_name); - } - free(path); - } - seg_addr = opal_shmem_segment_attach (seg_ds); - if (NULL == seg_addr) { - return OPAL_ERROR; - } - segments[seg_index].addr = seg_addr; - segments[seg_index].seg_ds = seg_ds; - } - - size = my_info.data_size; - ptr = (uint8_t*)seg_addr + my_info.offset; - - cnt = 1; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - opal_dss.load(&buf, ptr, size); - while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &bptr, &cnt, OPAL_BUFFER))) { - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (0 == strcmp(key, kp->key)) { - opal_list_append(kvs, &kp->super); - found = 1; - } else { - OBJ_RELEASE(kp); - } - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(bptr); - cnt = 1; - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } else { - if (1 == found) { - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: data for proc %s successfully fetched.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - rc = OPAL_SUCCESS; - } - } - /* protect the data */ - buf.base_ptr = NULL; - OBJ_DESTRUCT(&buf); - return rc; -} - -static int remove_data(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, const char *key) -{ - return OPAL_ERR_NOT_IMPLEMENTED; -} - diff --git a/opal/mca/dstore/sm/dstore_sm.h b/opal/mca/dstore/sm/dstore_sm.h deleted file mode 100644 index 89f5d7fc16a..00000000000 --- a/opal/mca/dstore/sm/dstore_sm.h +++ /dev/null @@ -1,49 +0,0 @@ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#ifndef OPAL_DSTORE_SM_H -#define OPAL_DSTORE_SM_H - -#include "opal/mca/dstore/dstore.h" -#include "opal/mca/shmem/shmem_types.h" - -BEGIN_C_DECLS - -OPAL_MODULE_DECLSPEC extern opal_dstore_base_component_t mca_dstore_sm_component; - -typedef struct { - opal_shmem_ds_t *seg_ds; - char* addr; -} segment_info; - -typedef struct { - opal_list_item_t super; - uint32_t jobid; - opal_shmem_ds_t seg_ds; - uint8_t *addr; -} opal_sm_tracker_t; -OBJ_CLASS_DECLARATION(opal_sm_tracker_t); - -typedef struct { - opal_dstore_base_module_t api; - opal_list_t tracklist; -} mca_dstore_sm_module_t; -OPAL_MODULE_DECLSPEC extern mca_dstore_sm_module_t opal_dstore_sm_module; - -typedef struct { - pid_t seg_cpid; - int seg_id; - size_t seg_size; - char file_name[256]; -} seg_info_short; - - -END_C_DECLS - -#endif /* OPAL_DSTORE_SM_H */ diff --git a/opal/mca/dstore/sm/dstore_sm_component.c b/opal/mca/dstore/sm/dstore_sm_component.c deleted file mode 100644 index 55eddc4b62c..00000000000 --- a/opal/mca/dstore/sm/dstore_sm_component.c +++ /dev/null @@ -1,176 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * Copyright (c) 2015 Intel, Inc. All rights reserved. - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights - * reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "opal_config.h" -#include "opal/constants.h" - -#include "opal/mca/base/base.h" -#include "opal/util/argv.h" - -#include "opal/mca/dstore/dstore.h" -#include "opal/mca/dstore/base/base.h" -#include "dstore_sm.h" -#include "opal/mca/shmem/base/base.h" - -static int opal_dstore_sm_enable = 0; -static int dstore_sm_query(mca_base_module_t **module, int *priority); -static opal_dstore_base_module_t *component_create(opal_list_t *attrs); -static int component_update(int hdl, opal_list_t *attributes); -static int add_trk(opal_dstore_base_module_t *imod, - uint32_t jid, char* seg_info); -static int component_register(void); - -/* - * Instantiate the public struct with all of our public information - * and pointers to our public functions in it - */ -opal_dstore_base_component_t mca_dstore_sm_component = { - .base_version = { - OPAL_DSTORE_BASE_VERSION_2_0_0, - - /* Component name and version */ - .mca_component_name = "sm", - MCA_BASE_MAKE_VERSION(component, OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION, - OPAL_RELEASE_VERSION), - - .mca_query_component = dstore_sm_query, - .mca_register_component_params = component_register - }, - .base_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, - .create_handle = component_create, - .update_handle = component_update, -}; - -static int dstore_sm_query(mca_base_module_t **module, int *priority) -{ - *priority = 0; - *module = NULL; - return OPAL_SUCCESS; -} - -static int component_register(void) -{ - opal_dstore_sm_enable = 0; - (void)mca_base_component_var_register(&mca_dstore_sm_component.base_version, "enable", - "Enable/disable dstore sm component (default: disabled)", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &opal_dstore_sm_enable); - return OPAL_SUCCESS; -} - -static opal_dstore_base_module_t *component_create(opal_list_t *attrs) -{ - mca_dstore_sm_module_t *mod; - - if (0 == opal_dstore_sm_enable) { - return NULL; - } - mod = (mca_dstore_sm_module_t*)malloc(sizeof(mca_dstore_sm_module_t)); - if (NULL == mod) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - return NULL; - } - /* copy the APIs across */ - memcpy(mod, &opal_dstore_sm_module.api, sizeof(opal_dstore_base_module_t)); - /* let the module init itself */ - if (OPAL_SUCCESS != mod->api.init((struct opal_dstore_base_module_t*)mod)) { - /* release the module and return the error */ - free(mod); - return NULL; - } - return (opal_dstore_base_module_t*)mod; -} - -static int component_update(int hdl, opal_list_t *attributes) -{ - opal_dstore_handle_t *handle; - opal_dstore_base_module_t *mod; - int rc; - - if (hdl < 0) { - return OPAL_ERR_NOT_INITIALIZED; - } - - if (NULL == (handle = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, hdl))) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); - return OPAL_ERR_NOT_FOUND; - } - - if (NULL == attributes) { - return OPAL_SUCCESS; - } - - mod = handle->module; - opal_dstore_attr_t *attr = (opal_dstore_attr_t *)opal_list_get_last(attributes); - rc = add_trk(mod, attr->jobid, attr->connection_info); - return rc; -} - -static int add_trk(opal_dstore_base_module_t *imod, - uint32_t jid, char* seg_info) -{ - int i; - char** tokens; - int num_tokens; - opal_sm_tracker_t *trk; - bool found_trk = false; - num_tokens = 0; - mca_dstore_sm_module_t *mod; - - mod = (mca_dstore_sm_module_t*)imod; - if (NULL == seg_info) { - return OPAL_ERROR; - } - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == jid) { - found_trk = true; - break; - } - } - if (!found_trk) { - trk = OBJ_NEW(opal_sm_tracker_t); - tokens = opal_argv_split(seg_info, ':'); - for (i = 0; NULL != tokens[i]; i++) { - num_tokens++; - } - memset(&trk->seg_ds, 0, sizeof(opal_shmem_ds_t)); - trk->seg_ds.seg_cpid = atoi(tokens[0]); - trk->seg_ds.seg_id = atoi(tokens[1]); - trk->seg_ds.seg_size = strtoul(tokens[2], NULL, 10); - trk->seg_ds.seg_base_addr = (unsigned char*)strtoul(tokens[3], NULL, 16); - if (5 == num_tokens && NULL != tokens[4]) { - strncpy(trk->seg_ds.seg_name, tokens[4], strlen(tokens[4])+1); - } - opal_argv_free(tokens); - - trk->jobid = jid; - trk->addr = opal_shmem_segment_attach (&trk->seg_ds); - if (NULL == trk->addr) { - if (trk->seg_ds.seg_cpid == getpid()) { - opal_shmem_unlink (&trk->seg_ds); - } - OBJ_RELEASE(trk); - return OPAL_ERROR; - } - if (trk->seg_ds.seg_cpid == getpid()) { - memset(trk->addr, 0, trk->seg_ds.seg_size); - } - opal_list_append(&mod->tracklist, &trk->super); - } - return OPAL_SUCCESS; -} - diff --git a/opal/mca/dstore/sm/owner.txt b/opal/mca/dstore/sm/owner.txt deleted file mode 100644 index ebba68c6f37..00000000000 --- a/opal/mca/dstore/sm/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL/MELLANOX -status: active diff --git a/opal/mca/pmix/native/pmix_native.c b/opal/mca/pmix/native/pmix_native.c index 7f98c911257..4fa06d7ed98 100644 --- a/opal/mca/pmix/native/pmix_native.c +++ b/opal/mca/pmix/native/pmix_native.c @@ -103,32 +103,6 @@ const opal_pmix_base_module_t opal_pmix_native_module = { // local variables static int init_cntr = 0; opal_process_name_t native_pname = {0}; -static uint32_t sm_flag; - -static void unpack_segment_info(opal_buffer_t *buf, opal_process_name_t *id, char** seg_info) -{ - int cnt; - int rc; - char *sinfo; - opal_process_name_t uid; - *seg_info = NULL; - /* extract the id of the contributor from the blob */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &uid, &cnt, OPAL_NAME))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - return; - } - OPAL_ERROR_LOG(rc); - return; - } - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sinfo, &cnt, OPAL_STRING))) { - OPAL_ERROR_LOG(rc); - return; - } - *id = uid; - *seg_info = sinfo; -} /* callback for wait completion */ @@ -152,24 +126,6 @@ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata) cb->active = false; } -static int pmix_sm_attach(uint32_t jid, char *seg_info) -{ - int rc; - opal_dstore_attr_t *attr; - opal_list_t attrs; - OBJ_CONSTRUCT(&attrs, opal_list_t); - attr = OBJ_NEW(opal_dstore_attr_t); - attr->jobid = jid; - attr->connection_info = strdup(seg_info); - opal_list_append(&attrs, &attr->super); - - rc = opal_dstore.update(opal_dstore_modex, &attrs); - opal_list_remove_item(&attrs, &attr->super); - OBJ_RELEASE(attr); - OPAL_LIST_DESTRUCT(&attrs); - return rc; -} - static int native_init(void) { char **uri, *srv; @@ -225,6 +181,9 @@ static int native_init(void) } /* if the rendezvous file doesn't exist, that's an error */ if (0 != access(uri[1], R_OK)) { + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s pmix:native rendezvous file %s does not exist", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), uri[1]); opal_argv_free(uri); return OPAL_ERR_NOT_FOUND; } @@ -240,29 +199,6 @@ static int native_init(void) } } - char* seg_info; - void *hdl; - int rc; - /* check if shared memory region is supported */ - opal_dstore.get_handle(opal_dstore_modex, &hdl); - if(0 == strcmp("sm", ((opal_dstore_handle_t *)hdl)->storage_component->base_version.mca_component_name)) { - sm_flag = 1; - } else { - sm_flag = 0; - } - /* if shared memory segment is available, then attach to shared memory region created by pmix server */ - if (1 == sm_flag) { - if (NULL == (seg_info = getenv("PMIX_SEG_INFO"))) { - /* error out - should have been here, but isn't */ - return OPAL_ERROR; - } - rc = pmix_sm_attach(OPAL_PROC_MY_NAME.jobid, seg_info); - if (OPAL_SUCCESS != rc) { - /* error out - should have shared memory segment attached */ - return OPAL_ERROR; - } - } - /* we will connect on first send */ return OPAL_SUCCESS; @@ -460,7 +396,6 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) opal_process_name_t id; size_t i; uint64_t np; - char *seg_info; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native executing fence on %u procs", @@ -492,13 +427,6 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) } } - /* pack 1 if we have sm dstore enabled, 0 otherwise */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(msg); - return rc; - } - /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -564,53 +492,46 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - if (0 == sm_flag) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return rc; + /* get the buffer that contains the data for the next proc */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; } - /* extract the id of the contributor from the blob */ + OPAL_ERROR_LOG(rc); + return rc; + } + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + OPAL_ERROR_LOG(rc); + return rc; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { OPAL_ERROR_LOG(rc); return rc; } - /* extract all blobs from this proc, starting with the scope */ + /* now unpack and store the values - everything goes into our internal store */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { - OPAL_ERROR_LOG(rc); - return rc; - } - /* now unpack and store the values - everything goes into our internal store */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); - } - OBJ_RELEASE(kp); - cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); } - OBJ_RELEASE(bptr); + OBJ_RELEASE(kp); cnt = 1; } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(msg); - } else { - unpack_segment_info(&cb->data, &id, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(id.jobid, seg_info); - } + OBJ_RELEASE(bptr); + cnt = 1; } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(msg); if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); } else { @@ -638,7 +559,6 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) opal_process_name_t id; size_t i; uint64_t np; - char *seg_info; /* get the number of contributors */ cnt = 1; @@ -648,56 +568,49 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) } /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - if (0 == sm_flag) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return; + /* get the buffer that contains the data for the next proc */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; } - /* extract the id of the contributor from the blob */ + OPAL_ERROR_LOG(rc); + return; + } + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + OPAL_ERROR_LOG(rc); + return; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { OPAL_ERROR_LOG(rc); return; } - /* extract all blobs from this proc, starting with the scope */ + /* now unpack and store the values - everything goes into our internal store */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { - OPAL_ERROR_LOG(rc); - return; - } - /* now unpack and store the values - everything goes into our internal store */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); - } - OBJ_RELEASE(kp); - cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); } - OBJ_RELEASE(bptr); + OBJ_RELEASE(kp); cnt = 1; } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(msg); - } else { - unpack_segment_info(buf, &id, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(id.jobid, seg_info); - } + OBJ_RELEASE(bptr); + cnt = 1; } - if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); } + OBJ_RELEASE(msg); + } + if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); } /* if a callback was provided, execute it */ @@ -747,13 +660,6 @@ static int native_fence_nb(opal_process_name_t *procs, size_t nprocs, } } - /* pack 1 if we have sm dstore enabled, 0 otherwise */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(msg); - return rc; - } - /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -823,9 +729,7 @@ static int native_get(const opal_process_name_t *id, opal_list_t vals; opal_value_t *kp; bool found; - int handle; - char *seg_info; - + opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native getting value for proc %s key %s", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), @@ -833,22 +737,13 @@ static int native_get(const opal_process_name_t *id, /* first see if we already have the info in our dstore */ OBJ_CONSTRUCT(&vals, opal_list_t); - if (1 == sm_flag) { - handle = opal_dstore_modex; - } else { - handle = opal_dstore_internal; - } - opal_proc_t *myproc = opal_proc_local_get(); - if (0 == opal_compare_proc(myproc->proc_name, *id)) { - handle = opal_dstore_internal; - } - if (OPAL_SUCCESS == opal_dstore.fetch(handle, id, + if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_internal, id, key, &vals)) { *kv = (opal_value_t*)opal_list_remove_first(&vals); OPAL_LIST_DESTRUCT(&vals); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native value retrieved from dstore", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native value retrieved from dstore", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); return OPAL_SUCCESS; } @@ -894,61 +789,40 @@ static int native_get(const opal_process_name_t *id, return rc; } found = false; - if (1 == sm_flag) { - opal_process_name_t uid; - unpack_segment_info(&cb->data, &uid, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(uid.jobid, seg_info); - } - OBJ_CONSTRUCT(&vals, opal_list_t); - if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_modex, id, - key, &vals)) { - *kv = (opal_value_t*)opal_list_remove_first(&vals); - OPAL_LIST_DESTRUCT(&vals); + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native value retrieved from dstore", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - found = true; - rc = OPAL_SUCCESS; - } else { - rc = OPAL_ERROR; - } - } else { - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native retrieved %s (%s) from server for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, - (OPAL_STRING == kp->type) ? kp->data.string : "NS", - OPAL_NAME_PRINT(*id)); - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { - OPAL_ERROR_LOG(ret); - } - if (0 == strcmp(key, kp->key)) { - *kv = kp; - found = true; - } else { - OBJ_RELEASE(kp); - } + "%s pmix:native retrieved %s (%s) from server for proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, + (OPAL_STRING == kp->type) ? kp->data.string : "NS", + OPAL_NAME_PRINT(*id)); + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { + OPAL_ERROR_LOG(ret); } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); + if (0 == strcmp(key, kp->key)) { + *kv = kp; + found = true; + } else { + OBJ_RELEASE(kp); } - OBJ_RELEASE(bptr); - cnt = 1; } if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); - } else { - rc = OPAL_SUCCESS; } + OBJ_RELEASE(bptr); + cnt = 1; + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } else { + rc = OPAL_SUCCESS; } OBJ_RELEASE(cb); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native get completed", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native get completed", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); if (found) { return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/native/pmix_native_component.c b/opal/mca/pmix/native/pmix_native_component.c index 486426f53e0..b048be8871d 100644 --- a/opal/mca/pmix/native/pmix_native_component.c +++ b/opal/mca/pmix/native/pmix_native_component.c @@ -79,21 +79,25 @@ opal_pmix_native_component_t mca_pmix_native_component = { static int pmix_native_open(void) { /* construct the component fields */ - mca_pmix_native_component.uri = NULL; - mca_pmix_native_component.id = opal_name_invalid; mca_pmix_native_component.cache_local = NULL; mca_pmix_native_component.cache_remote = NULL; mca_pmix_native_component.cache_global = NULL; + mca_pmix_native_component.evbase = NULL; + mca_pmix_native_component.id = opal_name_invalid; + mca_pmix_native_component.server = opal_name_invalid; + mca_pmix_native_component.uri = NULL; + memset(&mca_pmix_native_component.address, 0, sizeof(struct sockaddr_un)); mca_pmix_native_component.sd = -1; + mca_pmix_native_component.max_retries = 10; mca_pmix_native_component.state = PMIX_USOCK_UNCONNECTED; mca_pmix_native_component.tag = 0; - OBJ_CONSTRUCT(&mca_pmix_native_component.send_queue, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_native_component.posted_recvs, opal_list_t); - mca_pmix_native_component.send_msg = NULL; - mca_pmix_native_component.recv_msg = NULL; mca_pmix_native_component.send_ev_active = false; mca_pmix_native_component.recv_ev_active = false; mca_pmix_native_component.timer_ev_active = false; + OBJ_CONSTRUCT(&mca_pmix_native_component.send_queue, opal_list_t); + mca_pmix_native_component.send_msg = NULL; + mca_pmix_native_component.recv_msg = NULL; + OBJ_CONSTRUCT(&mca_pmix_native_component.posted_recvs, opal_list_t); return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/native/usock.c b/opal/mca/pmix/native/usock.c index f1a10f1f31b..9f6e1353a65 100644 --- a/opal/mca/pmix/native/usock.c +++ b/opal/mca/pmix/native/usock.c @@ -52,7 +52,6 @@ static int usock_send_blocking(char *ptr, size_t size); static void pmix_usock_try_connect(int fd, short args, void *cbdata); -static int usock_create_socket(void); /* State machine for internal operations */ typedef struct { @@ -195,122 +194,68 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata) OBJ_RELEASE(msg); } -static int usock_create_socket(void) -{ - int flags; - - if (mca_pmix_native_component.sd > 0) { - return OPAL_SUCCESS; - } - - OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output, - "%s pmix:usock:peer creating socket to server", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME))); - - mca_pmix_native_component.sd = socket(PF_UNIX, SOCK_STREAM, 0); - - if (mca_pmix_native_component.sd < 0) { - opal_output(0, "%s usock_peer_create_socket: socket() failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - return OPAL_ERR_UNREACH; - } - - /* setup the socket as non-blocking */ - if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) { - opal_output(0, "%s usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - } else { - flags |= O_NONBLOCK; - if(fcntl(mca_pmix_native_component.sd, F_SETFL, flags) < 0) - opal_output(0, "%s usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - } - - /* setup event callbacks */ - opal_event_set(mca_pmix_native_component.evbase, - &mca_pmix_native_component.recv_event, - mca_pmix_native_component.sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - pmix_usock_recv_handler, NULL); - opal_event_set_priority(&mca_pmix_native_component.recv_event, OPAL_EV_MSG_LO_PRI); - mca_pmix_native_component.recv_ev_active = false; - - opal_event_set(mca_pmix_native_component.evbase, - &mca_pmix_native_component.send_event, - mca_pmix_native_component.sd, - OPAL_EV_WRITE|OPAL_EV_PERSIST, - pmix_usock_send_handler, NULL); - opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI); - mca_pmix_native_component.send_ev_active = false; - - return OPAL_SUCCESS; -} - - /* * Try connecting to a peer */ static void pmix_usock_try_connect(int fd, short args, void *cbdata) { - int rc; + int rc, flags; opal_socklen_t addrlen = 0; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s usock_peer_try_connect: attempting to connect to server", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - if (OPAL_SUCCESS != usock_create_socket()) { - return; - } - - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s usock_peer_try_connect: attempting to connect to server on socket %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - mca_pmix_native_component.sd); - addrlen = sizeof(struct sockaddr_un); - retry_connect: - mca_pmix_native_component.retries++; - if (connect(mca_pmix_native_component.sd, (struct sockaddr *) &mca_pmix_native_component.address, addrlen) < 0) { - /* non-blocking so wait for completion */ - if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s waiting for connect completion to server - activating send event", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - /* just ensure the send_event is active */ - if (!mca_pmix_native_component.send_ev_active) { - opal_event_add(&mca_pmix_native_component.send_event, 0); - mca_pmix_native_component.send_ev_active = true; - } - return; + + while (mca_pmix_native_component.retries < mca_pmix_native_component.max_retries) { + mca_pmix_native_component.retries++; + /* Create the new socket */ + mca_pmix_native_component.sd = socket(PF_UNIX, SOCK_STREAM, 0); + if (mca_pmix_native_component.sd < 0) { + opal_output(0, "pmix:create_socket: socket() failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + continue; } + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "usock_peer_try_connect: attempting to connect to server on socket %d", + mca_pmix_native_component.sd); + /* try to connect */ + if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) { + if (opal_socket_errno == ETIMEDOUT) { + /* The server may be too busy to accept new connections */ + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "timeout connecting to server"); + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + continue; + } - /* Some kernels (Linux 2.6) will automatically software - abort a connection that was ECONNREFUSED on the last - attempt, without even trying to establish the - connection. Handle that case in a semi-rational - way by trying twice before giving up */ - if (ECONNABORTED == opal_socket_errno) { - if (mca_pmix_native_component.retries < mca_pmix_native_component.max_retries) { + /* Some kernels (Linux 2.6) will automatically software + abort a connection that was ECONNREFUSED on the last + attempt, without even trying to establish the + connection. Handle that case in a semi-rational + way by trying twice before giving up */ + if (ECONNABORTED == opal_socket_errno) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s connection to server aborted by OS - retrying", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - goto retry_connect; - } else { - /* We were unsuccessful in establishing this connection, and are - * not likely to suddenly become successful, - */ - mca_pmix_native_component.state = PMIX_USOCK_FAILED; + "connection to server aborted by OS - retrying"); CLOSE_THE_SOCKET(mca_pmix_native_component.sd); - return; + continue; } } + /* otherwise, the connect succeeded - so break out of the loop */ + break; + } + + if (mca_pmix_native_component.retries == mca_pmix_native_component.max_retries || + mca_pmix_native_component.sd < 0){ + /* We were unsuccessful in establishing this connection, and are + * not likely to suddenly become successful */ + opal_output(0, "pmix:create_socket: connection to server failed"); + if (0 <= mca_pmix_native_component.sd) { + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + } + return; } /* connection succeeded */ @@ -320,7 +265,37 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) "%s sock_peer_try_connect: Connection across to server succeeded", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - /* setup our recv to catch the return ack call */ + /* setup event callbacks */ + opal_event_set(mca_pmix_native_component.evbase, + &mca_pmix_native_component.recv_event, + mca_pmix_native_component.sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + pmix_usock_recv_handler, NULL); + opal_event_set_priority(&mca_pmix_native_component.recv_event, OPAL_EV_MSG_LO_PRI); + mca_pmix_native_component.recv_ev_active = false; + + opal_event_set(mca_pmix_native_component.evbase, + &mca_pmix_native_component.send_event, + mca_pmix_native_component.sd, + OPAL_EV_WRITE|OPAL_EV_PERSIST, + pmix_usock_send_handler, NULL); + opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI); + mca_pmix_native_component.send_ev_active = false; + + /* setup the socket as non-blocking */ + if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) { + opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if (fcntl(mca_pmix_native_component.sd, F_SETFL, flags) < 0) + opal_output(0, "usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } + + /* setup our recv to catch the return ack call */ if (!mca_pmix_native_component.recv_ev_active) { opal_event_add(&mca_pmix_native_component.recv_event, 0); mca_pmix_native_component.recv_ev_active = true; diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index 213b4ee8065..f79fe63eba0 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -724,11 +724,6 @@ int orte_odls_default_launch_local_procs(opal_buffer_t *data) /* launch the local procs */ ORTE_ACTIVATE_LOCAL_LAUNCH(job, odls_default_fork_local_proc); - opal_dstore_attr_t *attr; - attr = pmix_server_create_shared_segment(job); - if (NULL != attr) { - opal_setenv("PMIX_SEG_INFO", attr->connection_info, true, &orte_launch_environ); - } return ORTE_SUCCESS; } diff --git a/orte/mca/oob/usock/Makefile.am b/orte/mca/oob/usock/Makefile.am index 61efc4c331d..307a61693ec 100644 --- a/orte/mca/oob/usock/Makefile.am +++ b/orte/mca/oob/usock/Makefile.am @@ -12,7 +12,7 @@ # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2012-2013 Los Alamos National Security, LLC. # All rights reserved -# Copyright (c) 2013-2014 Intel, Inc. All rights reserved. +# Copyright (c) 2013-2015 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -23,7 +23,6 @@ sources = \ oob_usock_component.h \ oob_usock.h \ - oob_usock_listener.h \ oob_usock_component.c \ oob_usock_connection.h \ oob_usock_sendrecv.h \ @@ -31,7 +30,6 @@ sources = \ oob_usock_peer.h \ oob_usock_ping.h \ oob_usock.c \ - oob_usock_listener.c \ oob_usock_connection.c \ oob_usock_sendrecv.c diff --git a/orte/mca/oob/usock/oob_usock_component.c b/orte/mca/oob/usock/oob_usock_component.c index 5c962a8dc6a..d42e8322dc8 100644 --- a/orte/mca/oob/usock/oob_usock_component.c +++ b/orte/mca/oob/usock/oob_usock_component.c @@ -63,6 +63,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/mca/state/state.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/parse_options.h" #include "orte/util/session_dir.h" @@ -185,6 +186,22 @@ static int component_available(void) return ORTE_SUCCESS; } +/* + * Handler for accepting connections from the event library + */ +static void connection_event_handler(int incoming_sd, short flags, void* cbdata) +{ + orte_pending_connection_t *pending = (orte_pending_connection_t*)cbdata; + int sd; + + sd = pending->fd; + pending->fd = -1; + OBJ_RELEASE(pending); + + /* process the connection */ + mca_oob_usock_module.api.accept_connection(sd, NULL); +} + /* Start the module */ static int component_startup(void) { @@ -205,11 +222,10 @@ static int component_startup(void) opal_output_verbose(2, orte_oob_base_framework.framework_output, "SUNPATH: %s", mca_oob_usock_component.address.sun_path); - /* if we are a daemon/HNP, start the listening event - this will create - * the rendezvous link - */ + /* if we are a daemon/HNP, register our listener */ if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { - if (ORTE_SUCCESS != (rc = orte_oob_usock_start_listening())) { + if (ORTE_SUCCESS != (rc = orte_register_listener((struct sockaddr*)&mca_oob_usock_component.address, sizeof(struct sockaddr_un), + orte_event_base, connection_event_handler))) { ORTE_ERROR_LOG(rc); } } else { @@ -235,10 +251,6 @@ static void component_shutdown(void) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { - if (mca_oob_usock_component.listener_ev_active) { - opal_event_del(&mca_oob_usock_component.listener_event); - mca_oob_usock_component.listener_ev_active = false; - } /* delete the rendezvous file */ unlink(mca_oob_usock_component.address.sun_path); } diff --git a/orte/mca/oob/usock/oob_usock_component.h b/orte/mca/oob/usock/oob_usock_component.h index 6da9c92594e..d0f0161f1ea 100644 --- a/orte/mca/oob/usock/oob_usock_component.h +++ b/orte/mca/oob/usock/oob_usock_component.h @@ -51,10 +51,6 @@ typedef struct { mca_oob_base_component_t super; /**< base OOB component */ int max_retries; /**< max number of retries before declaring peer gone */ struct sockaddr_un address; /**< address of our rendezvous point */ - /* connection support */ - opal_event_t listener_event; /**< my listener event */ - bool listener_ev_active; - int listener_socket; } mca_oob_usock_component_t; ORTE_MODULE_DECLSPEC extern mca_oob_usock_component_t mca_oob_usock_component; diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index 167df0e1e52..7847136df83 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -67,6 +67,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/mca/rml/rml.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/session_dir.h" #include "orte/util/show_help.h" @@ -116,7 +117,6 @@ static OBJ_CLASS_INSTANCE(pmix_server_trk_t, * Local utility functions */ static void connection_handler(int incoming_sd, short flags, void* cbdata); -static int pmix_server_start_listening(struct sockaddr_un *address); static void pmix_server_recv(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tg, void *cbdata); @@ -137,86 +137,10 @@ int pmix_server_output = -1; int pmix_server_local_handle = -1; int pmix_server_remote_handle = -1; int pmix_server_global_handle = -1; -int pmix_segment_size = -1; opal_list_t pmix_server_pending_dmx_reqs = {{0}}; static bool initialized = false; static struct sockaddr_un address; -static int pmix_server_listener_socket = -1; -static bool pmix_server_listener_ev_active = false; -static opal_event_t pmix_server_listener_event; static opal_list_t collectives; -static opal_list_t meta_segments; - -static opal_dstore_attr_t *pmix_sm_attach(uint32_t jobid, char *seg_info) -{ - int rc; - opal_dstore_attr_t *attr; - attr = OBJ_NEW(opal_dstore_attr_t); - attr->jobid = jobid; - attr->connection_info = strdup(seg_info); - - opal_list_append(&meta_segments, &attr->super); - rc = opal_dstore.update(opal_dstore_modex, &meta_segments); - return (OPAL_SUCCESS == rc) ? attr : NULL; -} - -opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid) -{ - int rc; - char *sm_file; - opal_shmem_ds_t seg_ds; - orte_job_t *jdata; - char *seg_info; - opal_dstore_attr_t *attr = NULL; - if (NULL == (jdata = orte_get_job_data_object(jid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return NULL; - } - /* create a shared segment */ - pmix_segment_size = jdata->num_procs * sizeof(meta_info) + META_OFFSET; - rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "dstore_segment.meta.%u", orte_process_info.job_session_dir, jid); - if (0 <= rc && NULL != sm_file) { - rc = opal_shmem_segment_create (&seg_ds, sm_file, pmix_segment_size); - free (sm_file); - if (OPAL_SUCCESS == rc) { - rc = asprintf(&seg_info, "%d:%d:%lu:%p:%s", seg_ds.seg_cpid, seg_ds.seg_id, seg_ds.seg_size, seg_ds.seg_base_addr, seg_ds.seg_name); - attr = pmix_sm_attach(jid, seg_info); - free(seg_info); - } else { - opal_output_verbose(2, pmix_server_output, - "%s PMIX shared memory segment was not created: opal_shmem_segment_create failed.", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - } - } - return attr; -} - -int pack_segment_info(opal_process_name_t id, opal_buffer_t *reply) -{ - opal_dstore_attr_t *attr; - int rc; - bool found_trk = false; - OPAL_LIST_FOREACH(attr, &meta_segments, opal_dstore_attr_t) { - if (attr->jobid == id.jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - /* create new segment for this job id and attach to it*/ - attr = pmix_server_create_shared_segment(id.jobid); - } - /* pack proc id into reply buffer */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { - return OPAL_ERROR; - } - /* pack seg info into reply buffer */ - if (NULL != attr) { - rc = opal_dss.pack(reply, &attr->connection_info, 1, OPAL_STRING); - return rc; - } - return OPAL_ERROR; -} void pmix_server_register(void) { @@ -270,15 +194,9 @@ int pmix_server_init(void) ORTE_JOB_FAMILY_PRINT(ORTE_PROC_MY_NAME->jobid), "pmix"); /* add it to our launch environment so our children get it */ -#if 0 - (void)asprintf(&pmix_server_uri, "%s:%s", - OPAL_NAME_PRINT(orte_process_info.my_name), - address.sun_path); -#else (void)asprintf(&pmix_server_uri, "%"PRIu32".%"PRIu32":%s", orte_process_info.my_name.jobid, orte_process_info.my_name.vpid, address.sun_path); -#endif opal_output_verbose(2, pmix_server_output, "%s PMIX server uri: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_server_uri); @@ -297,10 +215,6 @@ int pmix_server_init(void) ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } - if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } /* setup recv for collecting local barriers */ orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLL, @@ -314,12 +228,13 @@ int pmix_server_init(void) orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP, ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL); - /* start listening for connection requests */ - if (ORTE_SUCCESS != (rc = pmix_server_start_listening(&address))) { - OBJ_DESTRUCT(&pmix_server_peers); + /* register for connection requests */ + if (ORTE_SUCCESS != (rc = orte_register_listener((struct sockaddr*)&address, sizeof(struct sockaddr_un), + orte_event_base, connection_handler))) { + ORTE_ERROR_LOG(rc); + /* memory cleanup will occur when finalize is called */ } - OBJ_CONSTRUCT(&meta_segments, opal_list_t); return rc; } @@ -337,10 +252,6 @@ void pmix_server_finalize(void) "%s Finalizing PMIX server", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* stop listening */ - if (pmix_server_listener_ev_active) { - opal_event_del(&pmix_server_listener_event); - } /* stop receives */ orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLL); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_RELEASE); @@ -384,138 +295,23 @@ void pmix_server_finalize(void) } } OBJ_RELEASE(pmix_server_peers); - opal_dstore_attr_t *attr; - opal_list_item_t *item; - for (item = opal_list_remove_first(&meta_segments); - NULL != item; - item = opal_list_remove_first(&meta_segments)) { - attr = (opal_dstore_attr_t*) item; - OBJ_RELEASE(attr); - } - OPAL_LIST_DESTRUCT(&meta_segments); -} - -/* - * start listening on our rendezvous file - */ -static int pmix_server_start_listening(struct sockaddr_un *address) -{ - int flags; - opal_socklen_t addrlen; - int sd = -1; - - /* create a listen socket for incoming connection attempts */ - sd = socket(PF_UNIX, SOCK_STREAM, 0); - if (sd < 0) { - if (EAFNOSUPPORT != opal_socket_errno) { - opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - } - return ORTE_ERR_IN_ERRNO; - } - /* Set the socket to close-on-exec so that no children inherit - this FD */ - if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { - opal_output(0, "pmix_server: unable to set the " - "listening socket to CLOEXEC (%s:%d)\n", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - - addrlen = sizeof(struct sockaddr_un); - if (bind(sd, (struct sockaddr*)address, addrlen) < 0) { - opal_output(0, "%s bind() failed on error %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno ); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* setup listen backlog to maximum allowed by kernel */ - if (listen(sd, SOMAXCONN) < 0) { - opal_output(0, "pmix_server_component_init: listen(): %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* set socket up to be non-blocking, otherwise accept could block */ - if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - flags |= O_NONBLOCK; - if (fcntl(sd, F_SETFL, flags) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* record this socket */ - pmix_server_listener_socket = sd; - - /* setup to listen via the event lib */ - pmix_server_listener_ev_active = true; - opal_event_set(orte_event_base, &pmix_server_listener_event, - pmix_server_listener_socket, - OPAL_EV_READ|OPAL_EV_PERSIST, - connection_handler, - 0); - opal_event_set_priority(&pmix_server_listener_event, ORTE_MSG_PRI); - opal_event_add(&pmix_server_listener_event, 0); - - opal_output_verbose(2, pmix_server_output, - "%s pmix server listening on socket %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); - - return ORTE_SUCCESS; } /* * Handler for accepting connections from the event library */ -static void connection_handler(int incoming_sd, short flags, void* cbdata) +static void connection_handler(int fd, short flags, void* cbdata) { - struct sockaddr addr; - opal_socklen_t addrlen = sizeof(struct sockaddr); - int sd, rc; + int rc; pmix_server_hdr_t hdr; pmix_server_peer_t *peer; + orte_pending_connection_t *pending = (orte_pending_connection_t*)cbdata; + int sd; + + sd = pending->fd; + pending->fd = -1; + OBJ_RELEASE(pending); - sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen); - opal_output_verbose(2, pmix_server_output, - "%s connection_event_handler: working connection " - "(%d, %d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - sd, opal_socket_errno); - if (sd < 0) { - if (EINTR == opal_socket_errno) { - return; - } - if (opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - if (EMFILE == opal_socket_errno) { - /* - * Close incoming_sd so that orte_show_help will have a file - * descriptor with which to open the help file. We will be - * exiting anyway, so we don't need to keep it open. - */ - CLOSE_THE_SOCKET(incoming_sd); - ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS); - orte_show_help("help-orterun.txt", "orterun:sys-limit-sockets", true); - } else { - opal_output(0, "pmix_server_accept: accept() failed: %s (%d).", - strerror(opal_socket_errno), opal_socket_errno); - } - } - return; - } /* Set the socket to close-on-exec so that no subsequent children inherit this FD */ if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { @@ -529,6 +325,7 @@ static void connection_handler(int incoming_sd, short flags, void* cbdata) /* get the handshake */ if (ORTE_SUCCESS != (rc = pmix_server_recv_connect_ack(NULL, sd, &hdr))) { ORTE_ERROR_LOG(rc); + CLOSE_THE_SOCKET(sd); return; } @@ -540,6 +337,7 @@ static void connection_handler(int incoming_sd, short flags, void* cbdata) peer = OBJ_NEW(pmix_server_peer_t); if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(pmix_server_peers, sd, peer))) { OPAL_ERROR_LOG(rc); + CLOSE_THE_SOCKET(sd); return; } peer->name = hdr.id; @@ -911,32 +709,6 @@ static void pmix_server_release(int status, OPAL_ERROR_LOG(rc); } OBJ_RELEASE(msg); - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(id, reply_short); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_RELEASE(reply_short); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvf))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvf); - return; - } - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); /* get proc object for the target process */ memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); proc = orte_get_proc_object(&name); @@ -1161,11 +933,9 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, pmix_server_dmx_req_t *req, *nxt; int rc, ret; int32_t cnt; - opal_buffer_t *reply, xfer, *bptr, *data, *reply_short; + opal_buffer_t *reply, xfer, *bptr; opal_process_name_t target; opal_value_t kv; - orte_proc_t *proc, *proc_peer; - bool stored; opal_output_verbose(2, pmix_server_output, "%s dmdx:recv response from proc %s", @@ -1179,8 +949,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, return; } - proc = orte_get_proc_object(&target); - /* unpack the status */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) { @@ -1210,119 +978,42 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, OBJ_DESTRUCT(&xfer); } - stored = false; - data = NULL; /* check ALL reqs to see who requested this target - due to * async behavior, we may have requests from more than one * process */ - reply_short = NULL; OPAL_LIST_FOREACH_SAFE(req, nxt, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) { if (0 == opal_compare_proc(target, req->target)) { - /* get the proc object for the peer */ - proc_peer = orte_get_proc_object(&req->peer->name); - /* check if peer has access to shared memory dstore, - * if not, pack the reply and send. */ - if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - if (!stored) { - /* prep the reply */ - reply = OBJ_NEW(opal_buffer_t); - /* pack the returned status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - - /* pack the hostname blob */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - - /* pass across any returned blobs */ - opal_dss.copy_payload(reply, buffer); - stored = true; - } - OBJ_RETAIN(reply); - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); - } else { - /* If peer has an access to shared memory dstore, check - * if we already stored data for the target process. - * If not, pack them into the data buffer. - * So we do it once. */ - if (NULL == reply_short) { - /* reply_short is used when we store all data into shared memory segment */ - reply_short = OBJ_NEW(opal_buffer_t); - /* pack the returned status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_RELEASE(bptr); - return; - } - - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(target, reply_short); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - return; - } - } - if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { - /* prepare data buffer to store it in shared memory dstore segment */ - data = OBJ_NEW(opal_buffer_t); - - /* pack the hostname blob */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_RELEASE(bptr); - return; - } - - /* pass across any returned blobs */ - opal_dss.copy_payload(data, buffer); - /* create key-value object to store data for target process - * and put it into shared memory dstore */ - opal_value_t kvp; - OBJ_CONSTRUCT(&kvp, opal_value_t); - kvp.key = strdup("finalval"); - kvp.type = OPAL_BYTE_OBJECT; - kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvp.data.bo.size = data->bytes_used; - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &target, &kvp))) { - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvp); - ORTE_ERROR_LOG(rc); - return; - } - kvp.data.bo.bytes = NULL; - kvp.data.bo.size = 0; - OBJ_DESTRUCT(&kvp); - /* mark that we put data for this proc into shared memory dstore */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - } - OBJ_RETAIN(reply_short); - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply_short); - } - if (NULL != bptr) { + /* prep the reply */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the returned status */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); OBJ_RELEASE(bptr); + return; } - if (NULL != data) { - OBJ_RELEASE(data); + + /* pack the hostname blob */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(bptr); + return; } + + /* pass across any returned blobs */ + opal_dss.copy_payload(reply, buffer); + /* send it */ + OBJ_RETAIN(reply); + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + /* request is complete, so remove it */ opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); OBJ_RELEASE(req); } } + if (NULL != bptr) { + OBJ_RELEASE(bptr); + } } diff --git a/orte/orted/pmix/pmix_server.h b/orte/orted/pmix/pmix_server.h index 0997e0b4d07..b7697ad89bc 100644 --- a/orte/orted/pmix/pmix_server.h +++ b/orte/orted/pmix/pmix_server.h @@ -12,7 +12,7 @@ * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -36,8 +36,6 @@ ORTE_DECLSPEC void pmix_server_register(void); /* provide access to the pmix server uri */ ORTE_DECLSPEC extern char *pmix_server_uri; -ORTE_DECLSPEC extern opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid); - END_C_DECLS #endif /* PMIX_SERVER_H_ */ diff --git a/orte/orted/pmix/pmix_server_process_msgs.c b/orte/orted/pmix/pmix_server_process_msgs.c index fd46a7aaf67..a160a5e74a1 100644 --- a/orte/orted/pmix/pmix_server_process_msgs.c +++ b/orte/orted/pmix/pmix_server_process_msgs.c @@ -86,13 +86,11 @@ void pmix_server_process_message(pmix_server_peer_t *peer) int32_t cnt; pmix_cmd_t cmd; opal_buffer_t *reply, xfer, *bptr, buf, save, blocal, bremote; - opal_buffer_t *data; opal_value_t kv, *kvp, *kvp2, *kp; opal_process_name_t id, idreq; orte_process_name_t name; orte_job_t *jdata; orte_proc_t *proc; - orte_proc_t *proc_peer; opal_list_t values; uint32_t tag; opal_pmix_scope_t scope; @@ -100,7 +98,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) pmix_server_dmx_req_t *req, *nextreq; bool found; orte_grpcomm_signature_t *sig; - uint32_t sm_flag; /* xfer the message to a buffer for unpacking */ OBJ_CONSTRUCT(&xfer, opal_buffer_t); @@ -206,13 +203,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) (PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE", tmp); free(tmp); } - /* unpack flag if sm dstore is supported by the client */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &sm_flag, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - return; - } /* if we are in a group collective mode, then we need to prep * the data as it should be included in the modex */ OBJ_CONSTRUCT(&save, opal_buffer_t); @@ -222,11 +212,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) opal_dss.copy_payload(&save, &xfer); } - /* mark if peer proc has access to shared memory region*/ - if (1 == sm_flag) { - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_SM_ACCESS); - } - /* if data was given, unpack and store it in the pmix dstore - it is okay * if there was no data, it's just a fence */ cnt = 1; @@ -348,8 +333,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) /* yes - deliver a copy */ reply = OBJ_NEW(opal_buffer_t); if (NULL == req->proxy) { - /* get the proc object for the peer */ - proc_peer = orte_get_proc_object(&req->peer->name); /* pack the status */ ret = OPAL_SUCCESS; if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { @@ -358,123 +341,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_RELEASE(sig); return; } - /* check if the peer has an access to shared memory dstore segment */ - if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - /* always pass the hostname */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(PMIX_HOSTNAME); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.nodename); - kp = &kv; - if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&kv); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&kv); - /* pack the blob */ - bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&buf); - /* pass the local blob(s) */ - opal_dss.copy_payload(reply, &blocal); - } else { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(id, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(sig); - return; - } - if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { - data = OBJ_NEW(opal_buffer_t); - /* always pass the hostname */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(PMIX_HOSTNAME); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.nodename); - kp = &kv; - if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&kv); - OBJ_RELEASE(sig); - OBJ_DESTRUCT(&xfer); - return; - } - OBJ_DESTRUCT(&kv); - /* pack the blob */ - bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&buf); - /* pass the local blob(s) */ - opal_dss.copy_payload(data, &blocal); - opal_value_t kvp; - OBJ_CONSTRUCT(&kvp, opal_value_t); - kvp.key = strdup("finalval"); - kvp.type = OPAL_BYTE_OBJECT; - kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvp.data.bo.size = data->bytes_used; - kvp.data.bo.bytes = NULL; - kvp.data.bo.size = 0; - /* store data in the shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvp))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&kvp); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&kvp); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - } - } - /* use the PMIX send to return the data */ - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); - } else { - /* pack the id of the requested proc */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(sig); - return; - } - /* pack the status */ - ret = OPAL_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - return; - } /* always pass the hostname */ OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&kv, opal_value_t); @@ -487,29 +353,76 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_RELEASE(reply); OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&kv); + OBJ_RELEASE(sig); return; } OBJ_DESTRUCT(&kv); - /* pack the hostname blob */ + /* pack the blob */ bptr = &buf; if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(reply); OBJ_DESTRUCT(&xfer); OBJ_DESTRUCT(&buf); + OBJ_RELEASE(sig); return; } OBJ_DESTRUCT(&buf); - /* pass the remote blob(s) */ - opal_dss.copy_payload(reply, &bremote); - /* use RML to send the response */ - orte_rml.send_buffer_nb(&req->proxy->name, reply, - ORTE_RML_TAG_DIRECT_MODEX_RESP, - orte_rml_send_callback, NULL); + /* pass the local blob(s) */ + opal_dss.copy_payload(reply, &blocal); + } + /* use the PMIX send to return the data */ + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + } else { + /* pack the id of the requested proc */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_RELEASE(sig); + return; + } + /* pack the status */ + ret = OPAL_SUCCESS; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + return; + } + /* always pass the hostname */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(PMIX_HOSTNAME); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.nodename); + kp = &kv; + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&kv); + return; + } + OBJ_DESTRUCT(&kv); + /* pack the hostname blob */ + bptr = &buf; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; } - opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); - OBJ_RELEASE(req); + OBJ_DESTRUCT(&buf); + /* pass the remote blob(s) */ + opal_dss.copy_payload(reply, &bremote); + /* use RML to send the response */ + orte_rml.send_buffer_nb(&req->proxy->name, reply, + ORTE_RML_TAG_DIRECT_MODEX_RESP, + orte_rml_send_callback, NULL); } + opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); + OBJ_RELEASE(req); } OBJ_DESTRUCT(&blocal); OBJ_DESTRUCT(&bremote); @@ -583,7 +496,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* get the proc object for the peer */ memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); - proc_peer = orte_get_proc_object(&name); /* unpack the id of the proc whose data is being requested */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &idreq, &cnt, OPAL_NAME))) { @@ -609,34 +521,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) return; } - sm_flag = 0; - if (ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - sm_flag = 1; - } - /* if we have already stored data for this proc in shared memory region, - * then we just need to send a response */ - if (1 == sm_flag && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM) && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { - reply = OBJ_NEW(opal_buffer_t); - ret = OPAL_SUCCESS; - /* pack the error status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(reply); - return; - } - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(reply); - return; - } - PMIX_SERVER_QUEUE_SEND(peer, tag, reply); - OBJ_DESTRUCT(&xfer); - return; - } /* if we have not yet received data for this proc, then we just * need to track the request */ if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) { @@ -710,7 +594,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) ORTE_NAME_PRINT(&name)); kvp = NULL; kvp2 = NULL; - data = NULL; /* retrieve the local blob for that proc */ OBJ_CONSTRUCT(&values, opal_list_t); if (OPAL_SUCCESS == (ret = opal_dstore.fetch(pmix_server_local_handle, &idreq, "modex", &values))) { @@ -750,28 +633,13 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_DESTRUCT(&kv); /* pack the blob */ bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; - } + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; } - OBJ_DESTRUCT(&buf); /* local blob */ if (NULL != kvp) { @@ -787,29 +655,13 @@ void pmix_server_process_message(pmix_server_peer_t *peer) kvp->data.bo.bytes = NULL; kvp->data.bo.size = 0; bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp); - return; - } + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp); + return; } OBJ_DESTRUCT(&buf); OBJ_RELEASE(kvp); @@ -828,66 +680,16 @@ void pmix_server_process_message(pmix_server_peer_t *peer) kvp2->data.bo.bytes = NULL; kvp2->data.bo.size = 0; bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - return; - } - } - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - } - if (1 == sm_flag) { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - /* store data in the shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(reply); - OBJ_RELEASE(data); OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&kvf); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); return; } - /* protect the data */ - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); OBJ_DESTRUCT(&xfer); @@ -929,44 +731,8 @@ void pmix_server_process_message(pmix_server_peer_t *peer) /* xfer the data - the blobs are in the buffer, * so don't repack them. They will include the remote * hostname, so don't add it again */ - if (0 == sm_flag) { - opal_dss.copy_payload(reply, &buf); - } else { - data = OBJ_NEW(opal_buffer_t); - opal_dss.copy_payload(data, &buf); - } + opal_dss.copy_payload(reply, &buf); OBJ_DESTRUCT(&buf); - if (1 == sm_flag) { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - /* store data into shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvf); - return; - } - /* protect the data */ - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); return; } diff --git a/orte/runtime/orte_finalize.c b/orte/runtime/orte_finalize.c index 3a164b6fa3b..9ea60dee211 100644 --- a/orte/runtime/orte_finalize.c +++ b/orte/runtime/orte_finalize.c @@ -12,7 +12,7 @@ * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -33,6 +33,7 @@ #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_locks.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/show_help.h" @@ -58,6 +59,12 @@ int orte_finalize(void) /* flag that we are finalizing */ orte_finalizing = true; + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { + /* stop listening for connections - will + * be ignored if no listeners were registered */ + orte_stop_listening(); + } + /* flush the show_help system */ orte_show_help_finalize(); diff --git a/orte/runtime/orte_init.c b/orte/runtime/orte_init.c index 11d2cbb3cb5..b2e12104785 100644 --- a/orte/runtime/orte_init.c +++ b/orte/runtime/orte_init.c @@ -13,7 +13,7 @@ * reserved. * Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2007-2008 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * @@ -46,6 +46,7 @@ #include "orte/mca/ess/base/base.h" #include "orte/mca/ess/ess.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/proc_info.h" #include "orte/util/error_strings.h" @@ -203,14 +204,6 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags) goto error; } - if (ORTE_PROC_IS_APP) { - if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { - error = "opal dstore modex"; - ret = ORTE_ERR_FATAL; - goto error; - } - } - if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { /* let the pmix server register params */ pmix_server_register(); @@ -258,6 +251,16 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags) opal_timing_set_jobid(ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); #endif + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { + /* start listening - will be ignored if no listeners + * were registered */ + if (ORTE_SUCCESS != (ret = orte_start_listening())) { + ORTE_ERROR_LOG(ret); + error = "orte_start_listening"; + goto error; + } + } + /* All done */ return ORTE_SUCCESS; diff --git a/orte/util/Makefile.am b/orte/util/Makefile.am index d2b34893536..55ed763a12c 100644 --- a/orte/util/Makefile.am +++ b/orte/util/Makefile.am @@ -11,7 +11,7 @@ # All rights reserved. # Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. # Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. -# Copyright (c) 2014 Intel, Inc. All rights reserved. +# Copyright (c) 2014-2015 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -56,7 +56,8 @@ headers += \ util/comm/comm.h \ util/nidmap.h \ util/regex.h \ - util/attr.h + util/attr.h \ + util/listener.h lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \ util/error_strings.c \ @@ -74,7 +75,8 @@ lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \ util/comm/comm.c \ util/nidmap.c \ util/regex.c \ - util/attr.c + util/attr.c \ + util/listener.c # Remove the generated man pages distclean-local: diff --git a/orte/util/listener.c b/orte/util/listener.c new file mode 100644 index 00000000000..ed66d550238 --- /dev/null +++ b/orte/util/listener.c @@ -0,0 +1,381 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * In windows, many of the socket functions return an EWOULDBLOCK + * instead of things like EAGAIN, EINPROGRESS, etc. It has been + * verified that this will not conflict with other error codes that + * are returned by these functions under UNIX/Linux environments + */ + +#include "orte_config.h" +#include "orte/types.h" +#include "opal/types.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETDB_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include + +#include "opal/util/error.h" +#include "opal/util/output.h" +#include "opal/opal_socket_errno.h" +#include "opal/util/if.h" +#include "opal/util/net.h" +#include "opal/util/fd.h" +#include "opal/class/opal_list.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/util/name_fns.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/show_help.h" + +#include "orte/util/listener.h" + +static void* listen_thread_fn(opal_object_t *obj); +static opal_list_t mylisteners; +static bool initialized = false; +static opal_thread_t listen_thread; +static volatile bool listen_thread_active = false; +static struct timeval listen_thread_tv; +static int stop_thread[2]; + +#define CLOSE_THE_SOCKET(socket) \ + do { \ + shutdown(socket, 2); \ + close(socket); \ + socket = -1; \ + } while(0) + + +int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen, + opal_event_base_t *evbase, + orte_listener_callback_fn_t handler) +{ + orte_listener_t *conn; + int flags; + int sd = -1; + + if (!initialized) { + OBJ_CONSTRUCT(&mylisteners, opal_list_t); + OBJ_CONSTRUCT(&listen_thread, opal_thread_t); + if (0 > pipe(stop_thread)) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* Make sure the pipe FDs are set to close-on-exec so that + they don't leak into children */ + if (opal_fd_set_cloexec(stop_thread[0]) != OPAL_SUCCESS || + opal_fd_set_cloexec(stop_thread[1]) != OPAL_SUCCESS) { + close(stop_thread[0]); + close(stop_thread[1]); + ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO); + return ORTE_ERR_IN_ERRNO; + } + listen_thread_tv.tv_sec = 3600; + listen_thread_tv.tv_usec = 0; + initialized = true; + } + + /* create a listen socket for incoming connection attempts */ + sd = socket(PF_UNIX, SOCK_STREAM, 0); + if (sd < 0) { + if (EAFNOSUPPORT != opal_socket_errno) { + opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + } + return ORTE_ERR_IN_ERRNO; + } + /* Set the socket to close-on-exec so that no children inherit + this FD */ + if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { + opal_output(0, "pmix_server: unable to set the " + "listening socket to CLOEXEC (%s:%d)\n", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + + if (bind(sd, (struct sockaddr*)address, addrlen) < 0) { + opal_output(0, "%s bind() failed on error %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + strerror(opal_socket_errno), + opal_socket_errno ); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* setup listen backlog to maximum allowed by kernel */ + if (listen(sd, SOMAXCONN) < 0) { + opal_output(0, "orte_listener: listen() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* set socket up to be non-blocking, otherwise accept could block */ + if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { + opal_output(0, "orte_listener: fcntl(F_GETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + flags |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, flags) < 0) { + opal_output(0, "orte_listener: fcntl(F_SETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* add this port to our connections */ + conn = OBJ_NEW(orte_listener_t); + conn->sd = sd; + conn->evbase = evbase; + conn->handler = handler; + opal_list_append(&mylisteners, &conn->item); + + return ORTE_SUCCESS; +} + +/* + * Component initialization - create a module for each available + * TCP interface and initialize the static resources associated + * with that module. + * + * Also initializes the list of devices that will be used/supported by + * the module, using the if_include and if_exclude variables. This is + * the only place that this sorting should occur -- all other places + * should use the tcp_avaiable_devices list. This is a change from + * previous versions of this component. + */ +int orte_start_listening(void) +{ + int rc; + + /* if we aren't initialized, or have nothing + * registered, or are already listening, then return SUCCESS */ + if (!initialized || 0 == opal_list_get_size(&mylisteners) || + listen_thread_active) { + return ORTE_SUCCESS; + } + + /* start our listener thread */ + listen_thread_active = true; + listen_thread.t_run = listen_thread_fn; + listen_thread.t_arg = NULL; + if (OPAL_SUCCESS != (rc = opal_thread_start(&listen_thread))) { + ORTE_ERROR_LOG(rc); + opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + return rc; +} + +void orte_stop_listening(void) +{ + int i=0; + + if (!listen_thread_active) { + return; + } + + listen_thread_active = false; + /* tell the thread to exit */ + write(stop_thread[1], &i, sizeof(int)); + opal_thread_join(&listen_thread, NULL); + OBJ_DESTRUCT(&listen_thread); + OPAL_LIST_DESTRUCT(&mylisteners); +} + +/* + * The listen thread accepts incoming connections and places them + * in a queue for further processing + * + * Runs until orte_listener_shutdown is set to true. + */ +static void* listen_thread_fn(opal_object_t *obj) +{ + int rc, max, accepted_connections, sd; + opal_socklen_t addrlen = sizeof(struct sockaddr_storage); + orte_pending_connection_t *pending_connection; + struct timeval timeout; + fd_set readfds; + orte_listener_t *listener; + + while (listen_thread_active) { + FD_ZERO(&readfds); + max = -1; + OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) { + FD_SET(listener->sd, &readfds); + max = (listener->sd > max) ? listener->sd : max; + } + /* add the stop_thread fd */ + FD_SET(stop_thread[0], &readfds); + max = (stop_thread[0] > max) ? stop_thread[0] : max; + + /* set timeout interval */ + timeout.tv_sec = listen_thread_tv.tv_sec; + timeout.tv_usec = listen_thread_tv.tv_usec; + + /* Block in a select to avoid hammering the cpu. If a connection + * comes in, we'll get woken up right away. + */ + rc = select(max + 1, &readfds, NULL, NULL, &timeout); + if (!listen_thread_active) { + /* we've been asked to terminate */ + goto done; + } + if (rc < 0) { + if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) { + perror("select"); + } + continue; + } + + /* Spin accepting connections until all active listen sockets + * do not have any incoming connections, pushing each connection + * onto its respective event queue for processing + */ + do { + accepted_connections = 0; + OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) { + sd = listener->sd; + + /* according to the man pages, select replaces the given descriptor + * set with a subset consisting of those descriptors that are ready + * for the specified operation - in this case, a read. So we need to + * first check to see if this file descriptor is included in the + * returned subset + */ + if (0 == FD_ISSET(sd, &readfds)) { + /* this descriptor is not included */ + continue; + } + + /* this descriptor is ready to be read, which means a connection + * request has been received - so harvest it. All we want to do + * here is accept the connection and push the info onto the event + * library for subsequent processing - we don't want to actually + * process the connection here as it takes too long, and so the + * OS might start rejecting connections due to timeout. + */ + pending_connection = OBJ_NEW(orte_pending_connection_t); + opal_event_set(listener->evbase, &pending_connection->ev, -1, + OPAL_EV_WRITE, listener->handler, pending_connection); + opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI); + pending_connection->fd = accept(sd, + (struct sockaddr*)&(pending_connection->addr), + &addrlen); + if (pending_connection->fd < 0) { + OBJ_RELEASE(pending_connection); + + /* Non-fatal errors */ + if (EAGAIN == opal_socket_errno || + EWOULDBLOCK == opal_socket_errno) { + continue; + } + + /* If we run out of file descriptors, log an extra + warning (so that the user can know to fix this + problem) and abandon all hope. */ + else if (EMFILE == opal_socket_errno) { + CLOSE_THE_SOCKET(sd); + ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS); + orte_show_help("help-oob-tcp.txt", + "accept failed", + true, + opal_process_info.nodename, + opal_socket_errno, + strerror(opal_socket_errno), + "Out of file descriptors"); + goto done; + } + + /* For all other cases, close the socket, print a + warning but try to continue */ + else { + CLOSE_THE_SOCKET(sd); + orte_show_help("help-oob-tcp.txt", + "accept failed", + true, + opal_process_info.nodename, + opal_socket_errno, + strerror(opal_socket_errno), + "Unknown cause; job will try to continue"); + continue; + } + } + + /* activate the event */ + opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1); + accepted_connections++; + } + } while (accepted_connections > 0); + } + + done: + close(stop_thread[0]); + close(stop_thread[1]); + return NULL; +} + + +/* INSTANTIATE CLASSES */ +static void lcons(orte_listener_t *p) +{ + p->sd = -1; + p->evbase = NULL; + p->handler = NULL; +} +static void ldes(orte_listener_t *p) +{ + if (0 <= p->sd) { + CLOSE_THE_SOCKET(p->sd); + } +} +OBJ_CLASS_INSTANCE(orte_listener_t, + opal_list_item_t, + lcons, ldes); + +OBJ_CLASS_INSTANCE(orte_pending_connection_t, + opal_object_t, + NULL, + NULL); diff --git a/orte/util/listener.h b/orte/util/listener.h new file mode 100644 index 00000000000..010b26786bf --- /dev/null +++ b/orte/util/listener.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_LISTENER_H +#define ORTE_LISTENER_H + +#include "orte_config.h" + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include "opal/class/opal_list.h" +#include "opal/mca/event/event.h" + +/* callback prototype */ +typedef void (*orte_listener_callback_fn_t)(int sd, short args, void *cbdata); + +/* + * Data structure for accepting connections. + */ +typedef struct orte_listener_t { + opal_list_item_t item; + int sd; + opal_event_base_t *evbase; + orte_listener_callback_fn_t handler; +} orte_listener_t; +OBJ_CLASS_DECLARATION(orte_listener_t); + +typedef struct { + opal_object_t super; + opal_event_t ev; + int fd; + struct sockaddr_storage addr; +} orte_pending_connection_t; +OBJ_CLASS_DECLARATION(orte_pending_connection_t); + +ORTE_DECLSPEC int orte_start_listening(void); +ORTE_DECLSPEC void orte_stop_listening(void); +ORTE_DECLSPEC int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen, + opal_event_base_t *evbase, + orte_listener_callback_fn_t handler); + +#endif /* ORTE_LISTENER_H */