diff --git a/opal/mca/dstore/base/dstore_base_frame.c b/opal/mca/dstore/base/dstore_base_frame.c index 6294c2608a6..47d2db5313e 100644 --- a/opal/mca/dstore/base/dstore_base_frame.c +++ b/opal/mca/dstore/base/dstore_base_frame.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -43,7 +43,6 @@ opal_dstore_base_API_t opal_dstore = { opal_dstore_base_t opal_dstore_base = {0}; int opal_dstore_internal = -1; -int opal_dstore_modex = -1; static int opal_dstore_base_frame_close(void) { diff --git a/opal/mca/dstore/dstore.h b/opal/mca/dstore/dstore.h index 30245f94d66..198a56b4ed3 100644 --- a/opal/mca/dstore/dstore.h +++ b/opal/mca/dstore/dstore.h @@ -43,9 +43,6 @@ BEGIN_C_DECLS * as someone figures out how to separate the various * datastore channels */ -OPAL_DECLSPEC extern int opal_dstore_internal; -OPAL_DECLSPEC extern int opal_dstore_modex; - OPAL_DECLSPEC extern int opal_dstore_peer; OPAL_DECLSPEC extern int opal_dstore_internal; OPAL_DECLSPEC extern int opal_dstore_nonpeer; diff --git a/opal/mca/dstore/sm/Makefile.am b/opal/mca/dstore/sm/Makefile.am deleted file mode 100644 index e582d69b923..00000000000 --- a/opal/mca/dstore/sm/Makefile.am +++ /dev/null @@ -1,36 +0,0 @@ -# -# Copyright (c) 2014 Mellanox Technologies, Inc. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -sources = \ - dstore_sm.h \ - dstore_sm_component.c \ - dstore_sm.c - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_opal_dstore_sm_DSO -component_noinst = -component_install = mca_dstore_sm.la -else -component_noinst = libmca_dstore_sm.la -component_install = -endif - -mcacomponentdir = $(opallibdir) -mcacomponent_LTLIBRARIES = $(component_install) -mca_dstore_sm_la_SOURCES = $(sources) -mca_dstore_sm_la_LDFLAGS = -module -avoid-version -mca_dstore_sm_la_LIBADD = $(dstore_sm_LIBS) - -noinst_LTLIBRARIES = $(component_noinst) -libmca_dstore_sm_la_SOURCES =$(sources) -libmca_dstore_sm_la_LDFLAGS = -module -avoid-version diff --git a/opal/mca/dstore/sm/dstore_sm.c b/opal/mca/dstore/sm/dstore_sm.c deleted file mode 100644 index e62d1738c7f..00000000000 --- a/opal/mca/dstore/sm/dstore_sm.c +++ /dev/null @@ -1,427 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * Copyright (c) 2014 Research Organization for Information Science - * and Technology (RIST). All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - * - */ - -#include "opal_config.h" -#include "opal/constants.h" - -#include -#include - -#include "opal_stdint.h" -#include "opal/dss/dss_types.h" -#include "opal/util/error.h" -#include "opal/util/output.h" -#include "opal/util/show_help.h" - -#include "opal/mca/dstore/base/base.h" -#include "dstore_sm.h" -#include "opal/mca/pmix/pmix.h" -#include "opal/mca/shmem/base/base.h" - -static uint32_t cur_offset = 0; -static int32_t cur_seg_index = -1; - -static int init(struct opal_dstore_base_module_t *imod); -static void finalize(struct opal_dstore_base_module_t *imod); -static int store(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, - opal_value_t *val); -static int fetch(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, - const char *key, - opal_list_t *kvs); -static int remove_data(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *proc, const char *key); - -static void smtrkcon(opal_sm_tracker_t *p) -{ - p->jobid = 0; - p->addr = NULL; -} -static void smtrkdes(opal_sm_tracker_t *p) -{ -} - -OBJ_CLASS_INSTANCE(opal_sm_tracker_t, - opal_list_item_t, - smtrkcon, smtrkdes); - -#define SHARED_SEGMENT_SIZE (1<<22) - -mca_dstore_sm_module_t opal_dstore_sm_module = { - { - init, - finalize, - store, - fetch, - remove_data - } -}; - -segment_info *segments = NULL; -static int max_segment_num; - -/* Initialize our sm region */ -static int init(struct opal_dstore_base_module_t *imod) -{ - int i; - mca_dstore_sm_module_t *mod; - mod = (mca_dstore_sm_module_t*)imod; - - max_segment_num = META_OFFSET/sizeof(seg_info_short); - segments = malloc(max_segment_num * sizeof(segment_info)); - for (i = 0; i < max_segment_num; i++) { - segments[i].addr = NULL; - segments[i].seg_ds = NULL; - } - OBJ_CONSTRUCT(&mod->tracklist, opal_list_t); - return OPAL_SUCCESS; -} - -static void finalize(struct opal_dstore_base_module_t *imod) -{ - mca_dstore_sm_module_t *mod; - opal_sm_tracker_t *trk; - opal_list_item_t *item; - - mod = (mca_dstore_sm_module_t*)imod; - - int i; - for (i = 0; i < max_segment_num; i++) { - if (NULL != segments[i].seg_ds) { - if (segments[i].seg_ds->seg_cpid == getpid()) { - opal_shmem_unlink (segments[i].seg_ds); - } - opal_shmem_segment_detach (segments[i].seg_ds); - free(segments[i].seg_ds); - } - } - free(segments); - - /* release tracker object */ - for (item = opal_list_remove_first(&mod->tracklist); - NULL != item; - item = opal_list_remove_first(&mod->tracklist)) { - trk = (opal_sm_tracker_t*) item; - opal_shmem_segment_detach (&trk->seg_ds); - if (trk->seg_ds.seg_cpid == getpid()) { - opal_shmem_unlink (&trk->seg_ds); - } - OBJ_RELEASE(trk); - } - OPAL_LIST_DESTRUCT(&mod->tracklist); -} - - - -static int store(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, - opal_value_t *val) -{ - mca_dstore_sm_module_t *mod; - void *addr; - int32_t data_size; - opal_shmem_ds_t *seg_ds; - meta_info my_info; - seg_info_short sinfo; - char* seg_addr; - char *sm_file = NULL; - char *ch, *path; - int idx; - opal_sm_tracker_t *trk; - bool found_trk = false; - if (OPAL_BYTE_OBJECT != val->type) { - return OPAL_ERROR; - } - mod = (mca_dstore_sm_module_t*)imod; - data_size = val->data.bo.size; - - idx = uid->vpid; - /* look for segment info for target jobid */ - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == uid->jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: tracker object wasn't found for job id %u, proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - uid->jobid, - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - /* look for data for this process in meta_info segment */ - addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); - memcpy(&my_info, addr, sizeof(meta_info)); - if (0 < my_info.data_size && 0 <= my_info.seg_index) { - /* we should replace existing data for this process - * by new ones */ - if (my_info.data_size >= data_size) { - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: replace existing data for proc %s be the new ones", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - /* we can just simply replace the old data with the new ones */ - /* get existing segment from the list */ - seg_addr = segments[my_info.seg_index].addr; - seg_ds = segments[my_info.seg_index].seg_ds; - /* store data in this segment */ - addr = seg_addr + my_info.offset; - memset((uint8_t*)addr, 0, my_info.data_size); - memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); - /* update information about data size in meta info segment */ - my_info.data_size = data_size; - memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); - return OPAL_SUCCESS; - } - } - /* there is no data for this process, or there is data for new process - * but their size is smaller than the size of new data, so - * store them in the separate slot*/ - - /* store in another segment */ - if (0 > cur_seg_index || (cur_offset + data_size) > SHARED_SEGMENT_SIZE) { - if (max_segment_num == cur_seg_index+1) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: exceeded limit on number of segments %d. This value is managed by META_OFFSET macro.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - max_segment_num); - return OPAL_ERROR; - } - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: create new segment to store data for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - /* create new segment, attach to it and add it to the list of segments */ - cur_seg_index++; - cur_offset = 0; - if (0 < strlen(trk->seg_ds.seg_name)) { - path = strdup(trk->seg_ds.seg_name); - ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; - if (NULL != ch) { - *ch = '\0'; - (void)asprintf(&sm_file, "%sdstore_segment.%d", path, cur_seg_index); - } - free(path); - } - if (NULL == sm_file) { - (void)asprintf(&sm_file, "%s", "noname"); - } - if (NULL != sm_file) { - seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); - memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); - if (OPAL_SUCCESS != opal_shmem_segment_create (seg_ds, sm_file, SHARED_SEGMENT_SIZE)) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: couldn't create new shared segment to store key %s on proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - (NULL == val->key) ? "NULL" : val->key, - OPAL_NAME_PRINT(*uid)); - free(seg_ds); - if (NULL != sm_file) { - free (sm_file); - } - return OPAL_ERROR; - } - if (NULL != sm_file) { - free (sm_file); - } - } else { - return OPAL_ERROR; - } - seg_addr = opal_shmem_segment_attach (seg_ds); - if (NULL == seg_addr) { - opal_shmem_unlink (seg_ds); - free(seg_ds); - return OPAL_ERROR; - } - segments[cur_seg_index].seg_ds = seg_ds; - segments[cur_seg_index].addr = seg_addr; - /* store information about new created segment in header section. */ - sinfo.seg_cpid = seg_ds->seg_cpid; - sinfo.seg_id = seg_ds->seg_id; - sinfo.seg_size = seg_ds->seg_size; - if (0 < strlen(seg_ds->seg_name)) { - ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; - memcpy(sinfo.file_name, ch, strlen(ch)+1); - } else { - memcpy(sinfo.file_name, "noname", strlen("noname")+1); - } - memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); - } else { - /* get existing segment from the array */ - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: getting current segment info to store data for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - seg_addr = segments[cur_seg_index].addr; - seg_ds = segments[cur_seg_index].seg_ds; - memcpy(&sinfo, (uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); - if (sinfo.seg_cpid != seg_ds->seg_cpid) { - /* store information about new created segment in header section. */ - sinfo.seg_cpid = seg_ds->seg_cpid; - sinfo.seg_id = seg_ds->seg_id; - sinfo.seg_size = seg_ds->seg_size; - if (0 < strlen(seg_ds->seg_name)) { - ch = strrchr(seg_ds->seg_name, OPAL_PATH_SEP[0]) + 1; - memcpy(sinfo.file_name, ch, strlen(ch)+1); - } else { - memcpy(sinfo.file_name, "noname", strlen("noname")+1); - } - memcpy((uint8_t*)trk->addr + cur_seg_index * sizeof(seg_info_short), &sinfo, sizeof(seg_info_short)); - } - } - /* store data in this segment */ - addr = seg_addr + cur_offset; - memcpy((uint8_t*)addr, val->data.bo.bytes, val->data.bo.size); - - /* store segment index and offset for this process - * in meta info segment. */ - my_info.seg_index = cur_seg_index; - my_info.offset = cur_offset; - my_info.data_size = data_size; - memcpy((uint8_t*)trk->addr + META_OFFSET + idx*sizeof(meta_info), &my_info, sizeof(meta_info)); - cur_offset += data_size; - - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:store: data for proc %s stored successfully", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - return OPAL_SUCCESS; -} - -static int fetch(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, - const char *key, opal_list_t *kvs) -{ - int rc; - int32_t size; - mca_dstore_sm_module_t *mod; - void *addr, *ptr; - opal_buffer_t *bptr, buf; - int32_t cnt; - opal_value_t *kp; - opal_shmem_ds_t *seg_ds; - meta_info my_info; - seg_info_short sinfo; - char* seg_addr; - int found = 0; - int32_t seg_index; - char *ch, *path; - opal_sm_tracker_t *trk; - bool found_trk = false; - int idx; - - mod = (mca_dstore_sm_module_t*)imod; - /* look for segment info for target jobid */ - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == uid->jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: tracker object wasn't found for job id %u, proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - uid->jobid, - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - /* look for data for this process in meta_info segment */ - idx = uid->vpid; - addr = ((uint8_t*)trk->addr + META_OFFSET + idx * sizeof(meta_info)); - memcpy(&my_info, addr, sizeof(meta_info)); - if (0 == my_info.data_size) { - /* there is no data for this process */ - opal_output_verbose(0, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: data for proc %s wasn't found.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - return OPAL_ERROR; - } - seg_index = my_info.seg_index; - /* look for this seg index in array of attached segments. - * If not found, attach to this segment and - * store it in the array. */ - if (NULL != segments[seg_index].addr) { - seg_addr = segments[seg_index].addr; - } else { - seg_ds = (opal_shmem_ds_t*)malloc(sizeof(opal_shmem_ds_t)); - memset(seg_ds, 0, sizeof(opal_shmem_ds_t)); - memcpy(&sinfo, (uint8_t*)trk->addr + seg_index * sizeof(seg_info_short), sizeof(seg_info_short)); - seg_ds->seg_cpid = sinfo.seg_cpid; - seg_ds->seg_id = sinfo.seg_id; - seg_ds->seg_size = sinfo.seg_size; - if (0 < strlen(trk->seg_ds.seg_name)) { - path = strdup(trk->seg_ds.seg_name); - ch = strrchr(path, OPAL_PATH_SEP[0]) + 1; - if (NULL != ch) { - *ch = '\0'; - sprintf(seg_ds->seg_name, "%s%s", path, sinfo.file_name); - } - free(path); - } - seg_addr = opal_shmem_segment_attach (seg_ds); - if (NULL == seg_addr) { - return OPAL_ERROR; - } - segments[seg_index].addr = seg_addr; - segments[seg_index].seg_ds = seg_ds; - } - - size = my_info.data_size; - ptr = (uint8_t*)seg_addr + my_info.offset; - - cnt = 1; - OBJ_CONSTRUCT(&buf, opal_buffer_t); - opal_dss.load(&buf, ptr, size); - while (OPAL_SUCCESS == (rc = opal_dss.unpack(&buf, &bptr, &cnt, OPAL_BUFFER))) { - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (0 == strcmp(key, kp->key)) { - opal_list_append(kvs, &kp->super); - found = 1; - } else { - OBJ_RELEASE(kp); - } - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(bptr); - cnt = 1; - } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } else { - if (1 == found) { - opal_output_verbose(5, opal_dstore_base_framework.framework_output, - "%s dstore:sm:fetch: data for proc %s successfully fetched.", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - OPAL_NAME_PRINT(*uid)); - rc = OPAL_SUCCESS; - } - } - /* protect the data */ - buf.base_ptr = NULL; - OBJ_DESTRUCT(&buf); - return rc; -} - -static int remove_data(struct opal_dstore_base_module_t *imod, - const opal_process_name_t *uid, const char *key) -{ - return OPAL_ERR_NOT_IMPLEMENTED; -} - diff --git a/opal/mca/dstore/sm/dstore_sm.h b/opal/mca/dstore/sm/dstore_sm.h deleted file mode 100644 index 89f5d7fc16a..00000000000 --- a/opal/mca/dstore/sm/dstore_sm.h +++ /dev/null @@ -1,49 +0,0 @@ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#ifndef OPAL_DSTORE_SM_H -#define OPAL_DSTORE_SM_H - -#include "opal/mca/dstore/dstore.h" -#include "opal/mca/shmem/shmem_types.h" - -BEGIN_C_DECLS - -OPAL_MODULE_DECLSPEC extern opal_dstore_base_component_t mca_dstore_sm_component; - -typedef struct { - opal_shmem_ds_t *seg_ds; - char* addr; -} segment_info; - -typedef struct { - opal_list_item_t super; - uint32_t jobid; - opal_shmem_ds_t seg_ds; - uint8_t *addr; -} opal_sm_tracker_t; -OBJ_CLASS_DECLARATION(opal_sm_tracker_t); - -typedef struct { - opal_dstore_base_module_t api; - opal_list_t tracklist; -} mca_dstore_sm_module_t; -OPAL_MODULE_DECLSPEC extern mca_dstore_sm_module_t opal_dstore_sm_module; - -typedef struct { - pid_t seg_cpid; - int seg_id; - size_t seg_size; - char file_name[256]; -} seg_info_short; - - -END_C_DECLS - -#endif /* OPAL_DSTORE_SM_H */ diff --git a/opal/mca/dstore/sm/dstore_sm_component.c b/opal/mca/dstore/sm/dstore_sm_component.c deleted file mode 100644 index 55eddc4b62c..00000000000 --- a/opal/mca/dstore/sm/dstore_sm_component.c +++ /dev/null @@ -1,176 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* Copyright (c) 2014 Mellanox Technologies, Inc. - * All rights reserved. - * Copyright (c) 2015 Intel, Inc. All rights reserved. - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights - * reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include "opal_config.h" -#include "opal/constants.h" - -#include "opal/mca/base/base.h" -#include "opal/util/argv.h" - -#include "opal/mca/dstore/dstore.h" -#include "opal/mca/dstore/base/base.h" -#include "dstore_sm.h" -#include "opal/mca/shmem/base/base.h" - -static int opal_dstore_sm_enable = 0; -static int dstore_sm_query(mca_base_module_t **module, int *priority); -static opal_dstore_base_module_t *component_create(opal_list_t *attrs); -static int component_update(int hdl, opal_list_t *attributes); -static int add_trk(opal_dstore_base_module_t *imod, - uint32_t jid, char* seg_info); -static int component_register(void); - -/* - * Instantiate the public struct with all of our public information - * and pointers to our public functions in it - */ -opal_dstore_base_component_t mca_dstore_sm_component = { - .base_version = { - OPAL_DSTORE_BASE_VERSION_2_0_0, - - /* Component name and version */ - .mca_component_name = "sm", - MCA_BASE_MAKE_VERSION(component, OPAL_MAJOR_VERSION, OPAL_MINOR_VERSION, - OPAL_RELEASE_VERSION), - - .mca_query_component = dstore_sm_query, - .mca_register_component_params = component_register - }, - .base_data = { - /* The component is checkpoint ready */ - MCA_BASE_METADATA_PARAM_CHECKPOINT - }, - .create_handle = component_create, - .update_handle = component_update, -}; - -static int dstore_sm_query(mca_base_module_t **module, int *priority) -{ - *priority = 0; - *module = NULL; - return OPAL_SUCCESS; -} - -static int component_register(void) -{ - opal_dstore_sm_enable = 0; - (void)mca_base_component_var_register(&mca_dstore_sm_component.base_version, "enable", - "Enable/disable dstore sm component (default: disabled)", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, - &opal_dstore_sm_enable); - return OPAL_SUCCESS; -} - -static opal_dstore_base_module_t *component_create(opal_list_t *attrs) -{ - mca_dstore_sm_module_t *mod; - - if (0 == opal_dstore_sm_enable) { - return NULL; - } - mod = (mca_dstore_sm_module_t*)malloc(sizeof(mca_dstore_sm_module_t)); - if (NULL == mod) { - OPAL_ERROR_LOG(OPAL_ERR_OUT_OF_RESOURCE); - return NULL; - } - /* copy the APIs across */ - memcpy(mod, &opal_dstore_sm_module.api, sizeof(opal_dstore_base_module_t)); - /* let the module init itself */ - if (OPAL_SUCCESS != mod->api.init((struct opal_dstore_base_module_t*)mod)) { - /* release the module and return the error */ - free(mod); - return NULL; - } - return (opal_dstore_base_module_t*)mod; -} - -static int component_update(int hdl, opal_list_t *attributes) -{ - opal_dstore_handle_t *handle; - opal_dstore_base_module_t *mod; - int rc; - - if (hdl < 0) { - return OPAL_ERR_NOT_INITIALIZED; - } - - if (NULL == (handle = (opal_dstore_handle_t*)opal_pointer_array_get_item(&opal_dstore_base.handles, hdl))) { - OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); - return OPAL_ERR_NOT_FOUND; - } - - if (NULL == attributes) { - return OPAL_SUCCESS; - } - - mod = handle->module; - opal_dstore_attr_t *attr = (opal_dstore_attr_t *)opal_list_get_last(attributes); - rc = add_trk(mod, attr->jobid, attr->connection_info); - return rc; -} - -static int add_trk(opal_dstore_base_module_t *imod, - uint32_t jid, char* seg_info) -{ - int i; - char** tokens; - int num_tokens; - opal_sm_tracker_t *trk; - bool found_trk = false; - num_tokens = 0; - mca_dstore_sm_module_t *mod; - - mod = (mca_dstore_sm_module_t*)imod; - if (NULL == seg_info) { - return OPAL_ERROR; - } - OPAL_LIST_FOREACH(trk, &mod->tracklist, opal_sm_tracker_t) { - if (trk->jobid == jid) { - found_trk = true; - break; - } - } - if (!found_trk) { - trk = OBJ_NEW(opal_sm_tracker_t); - tokens = opal_argv_split(seg_info, ':'); - for (i = 0; NULL != tokens[i]; i++) { - num_tokens++; - } - memset(&trk->seg_ds, 0, sizeof(opal_shmem_ds_t)); - trk->seg_ds.seg_cpid = atoi(tokens[0]); - trk->seg_ds.seg_id = atoi(tokens[1]); - trk->seg_ds.seg_size = strtoul(tokens[2], NULL, 10); - trk->seg_ds.seg_base_addr = (unsigned char*)strtoul(tokens[3], NULL, 16); - if (5 == num_tokens && NULL != tokens[4]) { - strncpy(trk->seg_ds.seg_name, tokens[4], strlen(tokens[4])+1); - } - opal_argv_free(tokens); - - trk->jobid = jid; - trk->addr = opal_shmem_segment_attach (&trk->seg_ds); - if (NULL == trk->addr) { - if (trk->seg_ds.seg_cpid == getpid()) { - opal_shmem_unlink (&trk->seg_ds); - } - OBJ_RELEASE(trk); - return OPAL_ERROR; - } - if (trk->seg_ds.seg_cpid == getpid()) { - memset(trk->addr, 0, trk->seg_ds.seg_size); - } - opal_list_append(&mod->tracklist, &trk->super); - } - return OPAL_SUCCESS; -} - diff --git a/opal/mca/dstore/sm/owner.txt b/opal/mca/dstore/sm/owner.txt deleted file mode 100644 index ebba68c6f37..00000000000 --- a/opal/mca/dstore/sm/owner.txt +++ /dev/null @@ -1,7 +0,0 @@ -# -# owner/status file -# owner: institution that is responsible for this package -# status: e.g. active, maintenance, unmaintained -# -owner: INTEL/MELLANOX -status: active diff --git a/opal/mca/pmix/native/pmix_native.c b/opal/mca/pmix/native/pmix_native.c index 7f98c911257..4fa06d7ed98 100644 --- a/opal/mca/pmix/native/pmix_native.c +++ b/opal/mca/pmix/native/pmix_native.c @@ -103,32 +103,6 @@ const opal_pmix_base_module_t opal_pmix_native_module = { // local variables static int init_cntr = 0; opal_process_name_t native_pname = {0}; -static uint32_t sm_flag; - -static void unpack_segment_info(opal_buffer_t *buf, opal_process_name_t *id, char** seg_info) -{ - int cnt; - int rc; - char *sinfo; - opal_process_name_t uid; - *seg_info = NULL; - /* extract the id of the contributor from the blob */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &uid, &cnt, OPAL_NAME))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - return; - } - OPAL_ERROR_LOG(rc); - return; - } - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &sinfo, &cnt, OPAL_STRING))) { - OPAL_ERROR_LOG(rc); - return; - } - *id = uid; - *seg_info = sinfo; -} /* callback for wait completion */ @@ -152,24 +126,6 @@ static void wait_cbfunc(opal_buffer_t *buf, void *cbdata) cb->active = false; } -static int pmix_sm_attach(uint32_t jid, char *seg_info) -{ - int rc; - opal_dstore_attr_t *attr; - opal_list_t attrs; - OBJ_CONSTRUCT(&attrs, opal_list_t); - attr = OBJ_NEW(opal_dstore_attr_t); - attr->jobid = jid; - attr->connection_info = strdup(seg_info); - opal_list_append(&attrs, &attr->super); - - rc = opal_dstore.update(opal_dstore_modex, &attrs); - opal_list_remove_item(&attrs, &attr->super); - OBJ_RELEASE(attr); - OPAL_LIST_DESTRUCT(&attrs); - return rc; -} - static int native_init(void) { char **uri, *srv; @@ -225,6 +181,9 @@ static int native_init(void) } /* if the rendezvous file doesn't exist, that's an error */ if (0 != access(uri[1], R_OK)) { + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "%s pmix:native rendezvous file %s does not exist", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), uri[1]); opal_argv_free(uri); return OPAL_ERR_NOT_FOUND; } @@ -240,29 +199,6 @@ static int native_init(void) } } - char* seg_info; - void *hdl; - int rc; - /* check if shared memory region is supported */ - opal_dstore.get_handle(opal_dstore_modex, &hdl); - if(0 == strcmp("sm", ((opal_dstore_handle_t *)hdl)->storage_component->base_version.mca_component_name)) { - sm_flag = 1; - } else { - sm_flag = 0; - } - /* if shared memory segment is available, then attach to shared memory region created by pmix server */ - if (1 == sm_flag) { - if (NULL == (seg_info = getenv("PMIX_SEG_INFO"))) { - /* error out - should have been here, but isn't */ - return OPAL_ERROR; - } - rc = pmix_sm_attach(OPAL_PROC_MY_NAME.jobid, seg_info); - if (OPAL_SUCCESS != rc) { - /* error out - should have shared memory segment attached */ - return OPAL_ERROR; - } - } - /* we will connect on first send */ return OPAL_SUCCESS; @@ -460,7 +396,6 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) opal_process_name_t id; size_t i; uint64_t np; - char *seg_info; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native executing fence on %u procs", @@ -492,13 +427,6 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) } } - /* pack 1 if we have sm dstore enabled, 0 otherwise */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(msg); - return rc; - } - /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -564,53 +492,46 @@ static int native_fence(opal_process_name_t *procs, size_t nprocs) /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - if (0 == sm_flag) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return rc; + /* get the buffer that contains the data for the next proc */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&cb->data, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; } - /* extract the id of the contributor from the blob */ + OPAL_ERROR_LOG(rc); + return rc; + } + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + OPAL_ERROR_LOG(rc); + return rc; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { OPAL_ERROR_LOG(rc); return rc; } - /* extract all blobs from this proc, starting with the scope */ + /* now unpack and store the values - everything goes into our internal store */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { - OPAL_ERROR_LOG(rc); - return rc; - } - /* now unpack and store the values - everything goes into our internal store */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); - } - OBJ_RELEASE(kp); - cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); } - OBJ_RELEASE(bptr); + OBJ_RELEASE(kp); cnt = 1; } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(msg); - } else { - unpack_segment_info(&cb->data, &id, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(id.jobid, seg_info); - } + OBJ_RELEASE(bptr); + cnt = 1; } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } + OBJ_RELEASE(msg); if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); } else { @@ -638,7 +559,6 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) opal_process_name_t id; size_t i; uint64_t np; - char *seg_info; /* get the number of contributors */ cnt = 1; @@ -648,56 +568,49 @@ static void fencenb_cbfunc(opal_buffer_t *buf, void *cbdata) } /* if data was returned, unpack and store it */ for (i=0; i < np; i++) { - if (0 == sm_flag) { - /* get the buffer that contains the data for the next proc */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { - break; - } - OPAL_ERROR_LOG(rc); - return; + /* get the buffer that contains the data for the next proc */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &msg, &cnt, OPAL_BUFFER))) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc) { + break; } - /* extract the id of the contributor from the blob */ + OPAL_ERROR_LOG(rc); + return; + } + /* extract the id of the contributor from the blob */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + OPAL_ERROR_LOG(rc); + return; + } + /* extract all blobs from this proc, starting with the scope */ + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { + /* extract the blob for this scope */ cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &id, &cnt, OPAL_NAME))) { + if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { OPAL_ERROR_LOG(rc); return; } - /* extract all blobs from this proc, starting with the scope */ + /* now unpack and store the values - everything goes into our internal store */ cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(msg, &scope, &cnt, PMIX_SCOPE_T))) { - /* extract the blob for this scope */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(msg, &bptr, &cnt, OPAL_BUFFER))) { - OPAL_ERROR_LOG(rc); - return; - } - /* now unpack and store the values - everything goes into our internal store */ - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { - OPAL_ERROR_LOG(ret); - } - OBJ_RELEASE(kp); - cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, &id, kp))) { + OPAL_ERROR_LOG(ret); } - OBJ_RELEASE(bptr); + OBJ_RELEASE(kp); cnt = 1; } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); - } - OBJ_RELEASE(msg); - } else { - unpack_segment_info(buf, &id, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(id.jobid, seg_info); - } + OBJ_RELEASE(bptr); + cnt = 1; } - if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); } + OBJ_RELEASE(msg); + } + if (OPAL_SUCCESS != rc && OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); } /* if a callback was provided, execute it */ @@ -747,13 +660,6 @@ static int native_fence_nb(opal_process_name_t *procs, size_t nprocs, } } - /* pack 1 if we have sm dstore enabled, 0 otherwise */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(msg, &sm_flag, 1, OPAL_UINT32))) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(msg); - return rc; - } - /* if we haven't already done it, ensure we have committed our values */ if (NULL != mca_pmix_native_component.cache_local) { scope = PMIX_LOCAL; @@ -823,9 +729,7 @@ static int native_get(const opal_process_name_t *id, opal_list_t vals; opal_value_t *kp; bool found; - int handle; - char *seg_info; - + opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s pmix:native getting value for proc %s key %s", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), @@ -833,22 +737,13 @@ static int native_get(const opal_process_name_t *id, /* first see if we already have the info in our dstore */ OBJ_CONSTRUCT(&vals, opal_list_t); - if (1 == sm_flag) { - handle = opal_dstore_modex; - } else { - handle = opal_dstore_internal; - } - opal_proc_t *myproc = opal_proc_local_get(); - if (0 == opal_compare_proc(myproc->proc_name, *id)) { - handle = opal_dstore_internal; - } - if (OPAL_SUCCESS == opal_dstore.fetch(handle, id, + if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_internal, id, key, &vals)) { *kv = (opal_value_t*)opal_list_remove_first(&vals); OPAL_LIST_DESTRUCT(&vals); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native value retrieved from dstore", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native value retrieved from dstore", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); return OPAL_SUCCESS; } @@ -894,61 +789,40 @@ static int native_get(const opal_process_name_t *id, return rc; } found = false; - if (1 == sm_flag) { - opal_process_name_t uid; - unpack_segment_info(&cb->data, &uid, &seg_info); - if (NULL != seg_info) { - pmix_sm_attach(uid.jobid, seg_info); - } - OBJ_CONSTRUCT(&vals, opal_list_t); - if (OPAL_SUCCESS == opal_dstore.fetch(opal_dstore_modex, id, - key, &vals)) { - *kv = (opal_value_t*)opal_list_remove_first(&vals); - OPAL_LIST_DESTRUCT(&vals); + cnt = 1; + while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { + while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native value retrieved from dstore", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - found = true; - rc = OPAL_SUCCESS; - } else { - rc = OPAL_ERROR; - } - } else { - cnt = 1; - while (OPAL_SUCCESS == (rc = opal_dss.unpack(&cb->data, &bptr, &cnt, OPAL_BUFFER))) { - while (OPAL_SUCCESS == (rc = opal_dss.unpack(bptr, &kp, &cnt, OPAL_VALUE))) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native retrieved %s (%s) from server for proc %s", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, - (OPAL_STRING == kp->type) ? kp->data.string : "NS", - OPAL_NAME_PRINT(*id)); - if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { - OPAL_ERROR_LOG(ret); - } - if (0 == strcmp(key, kp->key)) { - *kv = kp; - found = true; - } else { - OBJ_RELEASE(kp); - } + "%s pmix:native retrieved %s (%s) from server for proc %s", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, + (OPAL_STRING == kp->type) ? kp->data.string : "NS", + OPAL_NAME_PRINT(*id)); + if (OPAL_SUCCESS != (ret = opal_dstore.store(opal_dstore_internal, id, kp))) { + OPAL_ERROR_LOG(ret); } - if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - OPAL_ERROR_LOG(rc); + if (0 == strcmp(key, kp->key)) { + *kv = kp; + found = true; + } else { + OBJ_RELEASE(kp); } - OBJ_RELEASE(bptr); - cnt = 1; } if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { OPAL_ERROR_LOG(rc); - } else { - rc = OPAL_SUCCESS; } + OBJ_RELEASE(bptr); + cnt = 1; + } + if (OPAL_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + OPAL_ERROR_LOG(rc); + } else { + rc = OPAL_SUCCESS; } OBJ_RELEASE(cb); opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s pmix:native get completed", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); + "%s pmix:native get completed", + OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); if (found) { return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/native/pmix_native_component.c b/opal/mca/pmix/native/pmix_native_component.c index 486426f53e0..b048be8871d 100644 --- a/opal/mca/pmix/native/pmix_native_component.c +++ b/opal/mca/pmix/native/pmix_native_component.c @@ -79,21 +79,25 @@ opal_pmix_native_component_t mca_pmix_native_component = { static int pmix_native_open(void) { /* construct the component fields */ - mca_pmix_native_component.uri = NULL; - mca_pmix_native_component.id = opal_name_invalid; mca_pmix_native_component.cache_local = NULL; mca_pmix_native_component.cache_remote = NULL; mca_pmix_native_component.cache_global = NULL; + mca_pmix_native_component.evbase = NULL; + mca_pmix_native_component.id = opal_name_invalid; + mca_pmix_native_component.server = opal_name_invalid; + mca_pmix_native_component.uri = NULL; + memset(&mca_pmix_native_component.address, 0, sizeof(struct sockaddr_un)); mca_pmix_native_component.sd = -1; + mca_pmix_native_component.max_retries = 10; mca_pmix_native_component.state = PMIX_USOCK_UNCONNECTED; mca_pmix_native_component.tag = 0; - OBJ_CONSTRUCT(&mca_pmix_native_component.send_queue, opal_list_t); - OBJ_CONSTRUCT(&mca_pmix_native_component.posted_recvs, opal_list_t); - mca_pmix_native_component.send_msg = NULL; - mca_pmix_native_component.recv_msg = NULL; mca_pmix_native_component.send_ev_active = false; mca_pmix_native_component.recv_ev_active = false; mca_pmix_native_component.timer_ev_active = false; + OBJ_CONSTRUCT(&mca_pmix_native_component.send_queue, opal_list_t); + mca_pmix_native_component.send_msg = NULL; + mca_pmix_native_component.recv_msg = NULL; + OBJ_CONSTRUCT(&mca_pmix_native_component.posted_recvs, opal_list_t); return OPAL_SUCCESS; } diff --git a/opal/mca/pmix/native/usock.c b/opal/mca/pmix/native/usock.c index f1a10f1f31b..9f6e1353a65 100644 --- a/opal/mca/pmix/native/usock.c +++ b/opal/mca/pmix/native/usock.c @@ -52,7 +52,6 @@ static int usock_send_blocking(char *ptr, size_t size); static void pmix_usock_try_connect(int fd, short args, void *cbdata); -static int usock_create_socket(void); /* State machine for internal operations */ typedef struct { @@ -195,122 +194,68 @@ void pmix_usock_process_msg(int fd, short flags, void *cbdata) OBJ_RELEASE(msg); } -static int usock_create_socket(void) -{ - int flags; - - if (mca_pmix_native_component.sd > 0) { - return OPAL_SUCCESS; - } - - OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output, - "%s pmix:usock:peer creating socket to server", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME))); - - mca_pmix_native_component.sd = socket(PF_UNIX, SOCK_STREAM, 0); - - if (mca_pmix_native_component.sd < 0) { - opal_output(0, "%s usock_peer_create_socket: socket() failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - return OPAL_ERR_UNREACH; - } - - /* setup the socket as non-blocking */ - if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) { - opal_output(0, "%s usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - } else { - flags |= O_NONBLOCK; - if(fcntl(mca_pmix_native_component.sd, F_SETFL, flags) < 0) - opal_output(0, "%s usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno); - } - - /* setup event callbacks */ - opal_event_set(mca_pmix_native_component.evbase, - &mca_pmix_native_component.recv_event, - mca_pmix_native_component.sd, - OPAL_EV_READ|OPAL_EV_PERSIST, - pmix_usock_recv_handler, NULL); - opal_event_set_priority(&mca_pmix_native_component.recv_event, OPAL_EV_MSG_LO_PRI); - mca_pmix_native_component.recv_ev_active = false; - - opal_event_set(mca_pmix_native_component.evbase, - &mca_pmix_native_component.send_event, - mca_pmix_native_component.sd, - OPAL_EV_WRITE|OPAL_EV_PERSIST, - pmix_usock_send_handler, NULL); - opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI); - mca_pmix_native_component.send_ev_active = false; - - return OPAL_SUCCESS; -} - - /* * Try connecting to a peer */ static void pmix_usock_try_connect(int fd, short args, void *cbdata) { - int rc; + int rc, flags; opal_socklen_t addrlen = 0; opal_output_verbose(2, opal_pmix_base_framework.framework_output, "%s usock_peer_try_connect: attempting to connect to server", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - if (OPAL_SUCCESS != usock_create_socket()) { - return; - } - - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s usock_peer_try_connect: attempting to connect to server on socket %d", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), - mca_pmix_native_component.sd); - addrlen = sizeof(struct sockaddr_un); - retry_connect: - mca_pmix_native_component.retries++; - if (connect(mca_pmix_native_component.sd, (struct sockaddr *) &mca_pmix_native_component.address, addrlen) < 0) { - /* non-blocking so wait for completion */ - if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { - opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s waiting for connect completion to server - activating send event", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - /* just ensure the send_event is active */ - if (!mca_pmix_native_component.send_ev_active) { - opal_event_add(&mca_pmix_native_component.send_event, 0); - mca_pmix_native_component.send_ev_active = true; - } - return; + + while (mca_pmix_native_component.retries < mca_pmix_native_component.max_retries) { + mca_pmix_native_component.retries++; + /* Create the new socket */ + mca_pmix_native_component.sd = socket(PF_UNIX, SOCK_STREAM, 0); + if (mca_pmix_native_component.sd < 0) { + opal_output(0, "pmix:create_socket: socket() failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + continue; } + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "usock_peer_try_connect: attempting to connect to server on socket %d", + mca_pmix_native_component.sd); + /* try to connect */ + if (connect(mca_pmix_native_component.sd, &mca_pmix_native_component.address, addrlen) < 0) { + if (opal_socket_errno == ETIMEDOUT) { + /* The server may be too busy to accept new connections */ + opal_output_verbose(2, opal_pmix_base_framework.framework_output, + "timeout connecting to server"); + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + continue; + } - /* Some kernels (Linux 2.6) will automatically software - abort a connection that was ECONNREFUSED on the last - attempt, without even trying to establish the - connection. Handle that case in a semi-rational - way by trying twice before giving up */ - if (ECONNABORTED == opal_socket_errno) { - if (mca_pmix_native_component.retries < mca_pmix_native_component.max_retries) { + /* Some kernels (Linux 2.6) will automatically software + abort a connection that was ECONNREFUSED on the last + attempt, without even trying to establish the + connection. Handle that case in a semi-rational + way by trying twice before giving up */ + if (ECONNABORTED == opal_socket_errno) { opal_output_verbose(2, opal_pmix_base_framework.framework_output, - "%s connection to server aborted by OS - retrying", - OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - goto retry_connect; - } else { - /* We were unsuccessful in establishing this connection, and are - * not likely to suddenly become successful, - */ - mca_pmix_native_component.state = PMIX_USOCK_FAILED; + "connection to server aborted by OS - retrying"); CLOSE_THE_SOCKET(mca_pmix_native_component.sd); - return; + continue; } } + /* otherwise, the connect succeeded - so break out of the loop */ + break; + } + + if (mca_pmix_native_component.retries == mca_pmix_native_component.max_retries || + mca_pmix_native_component.sd < 0){ + /* We were unsuccessful in establishing this connection, and are + * not likely to suddenly become successful */ + opal_output(0, "pmix:create_socket: connection to server failed"); + if (0 <= mca_pmix_native_component.sd) { + CLOSE_THE_SOCKET(mca_pmix_native_component.sd); + } + return; } /* connection succeeded */ @@ -320,7 +265,37 @@ static void pmix_usock_try_connect(int fd, short args, void *cbdata) "%s sock_peer_try_connect: Connection across to server succeeded", OPAL_NAME_PRINT(OPAL_PROC_MY_NAME)); - /* setup our recv to catch the return ack call */ + /* setup event callbacks */ + opal_event_set(mca_pmix_native_component.evbase, + &mca_pmix_native_component.recv_event, + mca_pmix_native_component.sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + pmix_usock_recv_handler, NULL); + opal_event_set_priority(&mca_pmix_native_component.recv_event, OPAL_EV_MSG_LO_PRI); + mca_pmix_native_component.recv_ev_active = false; + + opal_event_set(mca_pmix_native_component.evbase, + &mca_pmix_native_component.send_event, + mca_pmix_native_component.sd, + OPAL_EV_WRITE|OPAL_EV_PERSIST, + pmix_usock_send_handler, NULL); + opal_event_set_priority(&mca_pmix_native_component.send_event, OPAL_EV_MSG_LO_PRI); + mca_pmix_native_component.send_ev_active = false; + + /* setup the socket as non-blocking */ + if ((flags = fcntl(mca_pmix_native_component.sd, F_GETFL, 0)) < 0) { + opal_output(0, "usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if (fcntl(mca_pmix_native_component.sd, F_SETFL, flags) < 0) + opal_output(0, "usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } + + /* setup our recv to catch the return ack call */ if (!mca_pmix_native_component.recv_ev_active) { opal_event_add(&mca_pmix_native_component.recv_event, 0); mca_pmix_native_component.recv_ev_active = true; diff --git a/orte/mca/odls/default/odls_default_module.c b/orte/mca/odls/default/odls_default_module.c index 213b4ee8065..f79fe63eba0 100644 --- a/orte/mca/odls/default/odls_default_module.c +++ b/orte/mca/odls/default/odls_default_module.c @@ -724,11 +724,6 @@ int orte_odls_default_launch_local_procs(opal_buffer_t *data) /* launch the local procs */ ORTE_ACTIVATE_LOCAL_LAUNCH(job, odls_default_fork_local_proc); - opal_dstore_attr_t *attr; - attr = pmix_server_create_shared_segment(job); - if (NULL != attr) { - opal_setenv("PMIX_SEG_INFO", attr->connection_info, true, &orte_launch_environ); - } return ORTE_SUCCESS; } diff --git a/orte/mca/oob/usock/Makefile.am b/orte/mca/oob/usock/Makefile.am index 61efc4c331d..307a61693ec 100644 --- a/orte/mca/oob/usock/Makefile.am +++ b/orte/mca/oob/usock/Makefile.am @@ -12,7 +12,7 @@ # Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2012-2013 Los Alamos National Security, LLC. # All rights reserved -# Copyright (c) 2013-2014 Intel, Inc. All rights reserved. +# Copyright (c) 2013-2015 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -23,7 +23,6 @@ sources = \ oob_usock_component.h \ oob_usock.h \ - oob_usock_listener.h \ oob_usock_component.c \ oob_usock_connection.h \ oob_usock_sendrecv.h \ @@ -31,7 +30,6 @@ sources = \ oob_usock_peer.h \ oob_usock_ping.h \ oob_usock.c \ - oob_usock_listener.c \ oob_usock_connection.c \ oob_usock_sendrecv.c diff --git a/orte/mca/oob/usock/oob_usock_component.c b/orte/mca/oob/usock/oob_usock_component.c index 5c962a8dc6a..d42e8322dc8 100644 --- a/orte/mca/oob/usock/oob_usock_component.c +++ b/orte/mca/oob/usock/oob_usock_component.c @@ -63,6 +63,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/mca/state/state.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/parse_options.h" #include "orte/util/session_dir.h" @@ -185,6 +186,22 @@ static int component_available(void) return ORTE_SUCCESS; } +/* + * Handler for accepting connections from the event library + */ +static void connection_event_handler(int incoming_sd, short flags, void* cbdata) +{ + orte_pending_connection_t *pending = (orte_pending_connection_t*)cbdata; + int sd; + + sd = pending->fd; + pending->fd = -1; + OBJ_RELEASE(pending); + + /* process the connection */ + mca_oob_usock_module.api.accept_connection(sd, NULL); +} + /* Start the module */ static int component_startup(void) { @@ -205,11 +222,10 @@ static int component_startup(void) opal_output_verbose(2, orte_oob_base_framework.framework_output, "SUNPATH: %s", mca_oob_usock_component.address.sun_path); - /* if we are a daemon/HNP, start the listening event - this will create - * the rendezvous link - */ + /* if we are a daemon/HNP, register our listener */ if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { - if (ORTE_SUCCESS != (rc = orte_oob_usock_start_listening())) { + if (ORTE_SUCCESS != (rc = orte_register_listener((struct sockaddr*)&mca_oob_usock_component.address, sizeof(struct sockaddr_un), + orte_event_base, connection_event_handler))) { ORTE_ERROR_LOG(rc); } } else { @@ -235,10 +251,6 @@ static void component_shutdown(void) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { - if (mca_oob_usock_component.listener_ev_active) { - opal_event_del(&mca_oob_usock_component.listener_event); - mca_oob_usock_component.listener_ev_active = false; - } /* delete the rendezvous file */ unlink(mca_oob_usock_component.address.sun_path); } diff --git a/orte/mca/oob/usock/oob_usock_component.h b/orte/mca/oob/usock/oob_usock_component.h index 6da9c92594e..d0f0161f1ea 100644 --- a/orte/mca/oob/usock/oob_usock_component.h +++ b/orte/mca/oob/usock/oob_usock_component.h @@ -51,10 +51,6 @@ typedef struct { mca_oob_base_component_t super; /**< base OOB component */ int max_retries; /**< max number of retries before declaring peer gone */ struct sockaddr_un address; /**< address of our rendezvous point */ - /* connection support */ - opal_event_t listener_event; /**< my listener event */ - bool listener_ev_active; - int listener_socket; } mca_oob_usock_component_t; ORTE_MODULE_DECLSPEC extern mca_oob_usock_component_t mca_oob_usock_component; diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c index 167df0e1e52..7847136df83 100644 --- a/orte/orted/pmix/pmix_server.c +++ b/orte/orted/pmix/pmix_server.c @@ -67,6 +67,7 @@ #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/grpcomm/grpcomm.h" #include "orte/mca/rml/rml.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/session_dir.h" #include "orte/util/show_help.h" @@ -116,7 +117,6 @@ static OBJ_CLASS_INSTANCE(pmix_server_trk_t, * Local utility functions */ static void connection_handler(int incoming_sd, short flags, void* cbdata); -static int pmix_server_start_listening(struct sockaddr_un *address); static void pmix_server_recv(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tg, void *cbdata); @@ -137,86 +137,10 @@ int pmix_server_output = -1; int pmix_server_local_handle = -1; int pmix_server_remote_handle = -1; int pmix_server_global_handle = -1; -int pmix_segment_size = -1; opal_list_t pmix_server_pending_dmx_reqs = {{0}}; static bool initialized = false; static struct sockaddr_un address; -static int pmix_server_listener_socket = -1; -static bool pmix_server_listener_ev_active = false; -static opal_event_t pmix_server_listener_event; static opal_list_t collectives; -static opal_list_t meta_segments; - -static opal_dstore_attr_t *pmix_sm_attach(uint32_t jobid, char *seg_info) -{ - int rc; - opal_dstore_attr_t *attr; - attr = OBJ_NEW(opal_dstore_attr_t); - attr->jobid = jobid; - attr->connection_info = strdup(seg_info); - - opal_list_append(&meta_segments, &attr->super); - rc = opal_dstore.update(opal_dstore_modex, &meta_segments); - return (OPAL_SUCCESS == rc) ? attr : NULL; -} - -opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid) -{ - int rc; - char *sm_file; - opal_shmem_ds_t seg_ds; - orte_job_t *jdata; - char *seg_info; - opal_dstore_attr_t *attr = NULL; - if (NULL == (jdata = orte_get_job_data_object(jid))) { - ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); - return NULL; - } - /* create a shared segment */ - pmix_segment_size = jdata->num_procs * sizeof(meta_info) + META_OFFSET; - rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "dstore_segment.meta.%u", orte_process_info.job_session_dir, jid); - if (0 <= rc && NULL != sm_file) { - rc = opal_shmem_segment_create (&seg_ds, sm_file, pmix_segment_size); - free (sm_file); - if (OPAL_SUCCESS == rc) { - rc = asprintf(&seg_info, "%d:%d:%lu:%p:%s", seg_ds.seg_cpid, seg_ds.seg_id, seg_ds.seg_size, seg_ds.seg_base_addr, seg_ds.seg_name); - attr = pmix_sm_attach(jid, seg_info); - free(seg_info); - } else { - opal_output_verbose(2, pmix_server_output, - "%s PMIX shared memory segment was not created: opal_shmem_segment_create failed.", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - } - } - return attr; -} - -int pack_segment_info(opal_process_name_t id, opal_buffer_t *reply) -{ - opal_dstore_attr_t *attr; - int rc; - bool found_trk = false; - OPAL_LIST_FOREACH(attr, &meta_segments, opal_dstore_attr_t) { - if (attr->jobid == id.jobid) { - found_trk = true; - break; - } - } - if (!found_trk) { - /* create new segment for this job id and attach to it*/ - attr = pmix_server_create_shared_segment(id.jobid); - } - /* pack proc id into reply buffer */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { - return OPAL_ERROR; - } - /* pack seg info into reply buffer */ - if (NULL != attr) { - rc = opal_dss.pack(reply, &attr->connection_info, 1, OPAL_STRING); - return rc; - } - return OPAL_ERROR; -} void pmix_server_register(void) { @@ -270,15 +194,9 @@ int pmix_server_init(void) ORTE_JOB_FAMILY_PRINT(ORTE_PROC_MY_NAME->jobid), "pmix"); /* add it to our launch environment so our children get it */ -#if 0 - (void)asprintf(&pmix_server_uri, "%s:%s", - OPAL_NAME_PRINT(orte_process_info.my_name), - address.sun_path); -#else (void)asprintf(&pmix_server_uri, "%"PRIu32".%"PRIu32":%s", orte_process_info.my_name.jobid, orte_process_info.my_name.vpid, address.sun_path); -#endif opal_output_verbose(2, pmix_server_output, "%s PMIX server uri: %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), pmix_server_uri); @@ -297,10 +215,6 @@ int pmix_server_init(void) ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); return ORTE_ERR_OUT_OF_RESOURCE; } - if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { - ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); - return ORTE_ERR_OUT_OF_RESOURCE; - } /* setup recv for collecting local barriers */ orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLL, @@ -314,12 +228,13 @@ int pmix_server_init(void) orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DIRECT_MODEX_RESP, ORTE_RML_PERSISTENT, pmix_server_dmdx_resp, NULL); - /* start listening for connection requests */ - if (ORTE_SUCCESS != (rc = pmix_server_start_listening(&address))) { - OBJ_DESTRUCT(&pmix_server_peers); + /* register for connection requests */ + if (ORTE_SUCCESS != (rc = orte_register_listener((struct sockaddr*)&address, sizeof(struct sockaddr_un), + orte_event_base, connection_handler))) { + ORTE_ERROR_LOG(rc); + /* memory cleanup will occur when finalize is called */ } - OBJ_CONSTRUCT(&meta_segments, opal_list_t); return rc; } @@ -337,10 +252,6 @@ void pmix_server_finalize(void) "%s Finalizing PMIX server", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - /* stop listening */ - if (pmix_server_listener_ev_active) { - opal_event_del(&pmix_server_listener_event); - } /* stop receives */ orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DAEMON_COLL); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_COLL_RELEASE); @@ -384,138 +295,23 @@ void pmix_server_finalize(void) } } OBJ_RELEASE(pmix_server_peers); - opal_dstore_attr_t *attr; - opal_list_item_t *item; - for (item = opal_list_remove_first(&meta_segments); - NULL != item; - item = opal_list_remove_first(&meta_segments)) { - attr = (opal_dstore_attr_t*) item; - OBJ_RELEASE(attr); - } - OPAL_LIST_DESTRUCT(&meta_segments); -} - -/* - * start listening on our rendezvous file - */ -static int pmix_server_start_listening(struct sockaddr_un *address) -{ - int flags; - opal_socklen_t addrlen; - int sd = -1; - - /* create a listen socket for incoming connection attempts */ - sd = socket(PF_UNIX, SOCK_STREAM, 0); - if (sd < 0) { - if (EAFNOSUPPORT != opal_socket_errno) { - opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - } - return ORTE_ERR_IN_ERRNO; - } - /* Set the socket to close-on-exec so that no children inherit - this FD */ - if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { - opal_output(0, "pmix_server: unable to set the " - "listening socket to CLOEXEC (%s:%d)\n", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - - addrlen = sizeof(struct sockaddr_un); - if (bind(sd, (struct sockaddr*)address, addrlen) < 0) { - opal_output(0, "%s bind() failed on error %s (%d)", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - strerror(opal_socket_errno), - opal_socket_errno ); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* setup listen backlog to maximum allowed by kernel */ - if (listen(sd, SOMAXCONN) < 0) { - opal_output(0, "pmix_server_component_init: listen(): %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* set socket up to be non-blocking, otherwise accept could block */ - if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_GETFL) failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - flags |= O_NONBLOCK; - if (fcntl(sd, F_SETFL, flags) < 0) { - opal_output(0, "pmix_server_component_init: fcntl(F_SETFL) failed: %s (%d)", - strerror(opal_socket_errno), opal_socket_errno); - CLOSE_THE_SOCKET(sd); - return ORTE_ERROR; - } - - /* record this socket */ - pmix_server_listener_socket = sd; - - /* setup to listen via the event lib */ - pmix_server_listener_ev_active = true; - opal_event_set(orte_event_base, &pmix_server_listener_event, - pmix_server_listener_socket, - OPAL_EV_READ|OPAL_EV_PERSIST, - connection_handler, - 0); - opal_event_set_priority(&pmix_server_listener_event, ORTE_MSG_PRI); - opal_event_add(&pmix_server_listener_event, 0); - - opal_output_verbose(2, pmix_server_output, - "%s pmix server listening on socket %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); - - return ORTE_SUCCESS; } /* * Handler for accepting connections from the event library */ -static void connection_handler(int incoming_sd, short flags, void* cbdata) +static void connection_handler(int fd, short flags, void* cbdata) { - struct sockaddr addr; - opal_socklen_t addrlen = sizeof(struct sockaddr); - int sd, rc; + int rc; pmix_server_hdr_t hdr; pmix_server_peer_t *peer; + orte_pending_connection_t *pending = (orte_pending_connection_t*)cbdata; + int sd; + + sd = pending->fd; + pending->fd = -1; + OBJ_RELEASE(pending); - sd = accept(incoming_sd, (struct sockaddr*)&addr, &addrlen); - opal_output_verbose(2, pmix_server_output, - "%s connection_event_handler: working connection " - "(%d, %d)\n", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - sd, opal_socket_errno); - if (sd < 0) { - if (EINTR == opal_socket_errno) { - return; - } - if (opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { - if (EMFILE == opal_socket_errno) { - /* - * Close incoming_sd so that orte_show_help will have a file - * descriptor with which to open the help file. We will be - * exiting anyway, so we don't need to keep it open. - */ - CLOSE_THE_SOCKET(incoming_sd); - ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS); - orte_show_help("help-orterun.txt", "orterun:sys-limit-sockets", true); - } else { - opal_output(0, "pmix_server_accept: accept() failed: %s (%d).", - strerror(opal_socket_errno), opal_socket_errno); - } - } - return; - } /* Set the socket to close-on-exec so that no subsequent children inherit this FD */ if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { @@ -529,6 +325,7 @@ static void connection_handler(int incoming_sd, short flags, void* cbdata) /* get the handshake */ if (ORTE_SUCCESS != (rc = pmix_server_recv_connect_ack(NULL, sd, &hdr))) { ORTE_ERROR_LOG(rc); + CLOSE_THE_SOCKET(sd); return; } @@ -540,6 +337,7 @@ static void connection_handler(int incoming_sd, short flags, void* cbdata) peer = OBJ_NEW(pmix_server_peer_t); if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(pmix_server_peers, sd, peer))) { OPAL_ERROR_LOG(rc); + CLOSE_THE_SOCKET(sd); return; } peer->name = hdr.id; @@ -911,32 +709,6 @@ static void pmix_server_release(int status, OPAL_ERROR_LOG(rc); } OBJ_RELEASE(msg); - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(id, reply_short); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_RELEASE(reply_short); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvf))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvf); - return; - } - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); /* get proc object for the target process */ memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); proc = orte_get_proc_object(&name); @@ -1161,11 +933,9 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, pmix_server_dmx_req_t *req, *nxt; int rc, ret; int32_t cnt; - opal_buffer_t *reply, xfer, *bptr, *data, *reply_short; + opal_buffer_t *reply, xfer, *bptr; opal_process_name_t target; opal_value_t kv; - orte_proc_t *proc, *proc_peer; - bool stored; opal_output_verbose(2, pmix_server_output, "%s dmdx:recv response from proc %s", @@ -1179,8 +949,6 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, return; } - proc = orte_get_proc_object(&target); - /* unpack the status */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &ret, &cnt, OPAL_INT))) { @@ -1210,119 +978,42 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender, OBJ_DESTRUCT(&xfer); } - stored = false; - data = NULL; /* check ALL reqs to see who requested this target - due to * async behavior, we may have requests from more than one * process */ - reply_short = NULL; OPAL_LIST_FOREACH_SAFE(req, nxt, &pmix_server_pending_dmx_reqs, pmix_server_dmx_req_t) { if (0 == opal_compare_proc(target, req->target)) { - /* get the proc object for the peer */ - proc_peer = orte_get_proc_object(&req->peer->name); - /* check if peer has access to shared memory dstore, - * if not, pack the reply and send. */ - if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - if (!stored) { - /* prep the reply */ - reply = OBJ_NEW(opal_buffer_t); - /* pack the returned status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - - /* pack the hostname blob */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(bptr); - return; - } - - /* pass across any returned blobs */ - opal_dss.copy_payload(reply, buffer); - stored = true; - } - OBJ_RETAIN(reply); - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); - } else { - /* If peer has an access to shared memory dstore, check - * if we already stored data for the target process. - * If not, pack them into the data buffer. - * So we do it once. */ - if (NULL == reply_short) { - /* reply_short is used when we store all data into shared memory segment */ - reply_short = OBJ_NEW(opal_buffer_t); - /* pack the returned status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply_short, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_RELEASE(bptr); - return; - } - - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(target, reply_short); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - return; - } - } - if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { - /* prepare data buffer to store it in shared memory dstore segment */ - data = OBJ_NEW(opal_buffer_t); - - /* pack the hostname blob */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_RELEASE(bptr); - return; - } - - /* pass across any returned blobs */ - opal_dss.copy_payload(data, buffer); - /* create key-value object to store data for target process - * and put it into shared memory dstore */ - opal_value_t kvp; - OBJ_CONSTRUCT(&kvp, opal_value_t); - kvp.key = strdup("finalval"); - kvp.type = OPAL_BYTE_OBJECT; - kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvp.data.bo.size = data->bytes_used; - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &target, &kvp))) { - OBJ_RELEASE(reply_short); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvp); - ORTE_ERROR_LOG(rc); - return; - } - kvp.data.bo.bytes = NULL; - kvp.data.bo.size = 0; - OBJ_DESTRUCT(&kvp); - /* mark that we put data for this proc into shared memory dstore */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - } - OBJ_RETAIN(reply_short); - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply_short); - } - if (NULL != bptr) { + /* prep the reply */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the returned status */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); OBJ_RELEASE(bptr); + return; } - if (NULL != data) { - OBJ_RELEASE(data); + + /* pack the hostname blob */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(bptr); + return; } + + /* pass across any returned blobs */ + opal_dss.copy_payload(reply, buffer); + /* send it */ + OBJ_RETAIN(reply); + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + /* request is complete, so remove it */ opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); OBJ_RELEASE(req); } } + if (NULL != bptr) { + OBJ_RELEASE(bptr); + } } diff --git a/orte/orted/pmix/pmix_server.h b/orte/orted/pmix/pmix_server.h index 0997e0b4d07..b7697ad89bc 100644 --- a/orte/orted/pmix/pmix_server.h +++ b/orte/orted/pmix/pmix_server.h @@ -12,7 +12,7 @@ * Copyright (c) 2006-2013 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -36,8 +36,6 @@ ORTE_DECLSPEC void pmix_server_register(void); /* provide access to the pmix server uri */ ORTE_DECLSPEC extern char *pmix_server_uri; -ORTE_DECLSPEC extern opal_dstore_attr_t *pmix_server_create_shared_segment(orte_jobid_t jid); - END_C_DECLS #endif /* PMIX_SERVER_H_ */ diff --git a/orte/orted/pmix/pmix_server_process_msgs.c b/orte/orted/pmix/pmix_server_process_msgs.c index fd46a7aaf67..a160a5e74a1 100644 --- a/orte/orted/pmix/pmix_server_process_msgs.c +++ b/orte/orted/pmix/pmix_server_process_msgs.c @@ -86,13 +86,11 @@ void pmix_server_process_message(pmix_server_peer_t *peer) int32_t cnt; pmix_cmd_t cmd; opal_buffer_t *reply, xfer, *bptr, buf, save, blocal, bremote; - opal_buffer_t *data; opal_value_t kv, *kvp, *kvp2, *kp; opal_process_name_t id, idreq; orte_process_name_t name; orte_job_t *jdata; orte_proc_t *proc; - orte_proc_t *proc_peer; opal_list_t values; uint32_t tag; opal_pmix_scope_t scope; @@ -100,7 +98,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) pmix_server_dmx_req_t *req, *nextreq; bool found; orte_grpcomm_signature_t *sig; - uint32_t sm_flag; /* xfer the message to a buffer for unpacking */ OBJ_CONSTRUCT(&xfer, opal_buffer_t); @@ -206,13 +203,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) (PMIX_FENCENB_CMD == cmd) ? "FENCE_NB" : "FENCE", tmp); free(tmp); } - /* unpack flag if sm dstore is supported by the client */ - cnt = 1; - if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &sm_flag, &cnt, OPAL_UINT32))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - return; - } /* if we are in a group collective mode, then we need to prep * the data as it should be included in the modex */ OBJ_CONSTRUCT(&save, opal_buffer_t); @@ -222,11 +212,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) opal_dss.copy_payload(&save, &xfer); } - /* mark if peer proc has access to shared memory region*/ - if (1 == sm_flag) { - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_SM_ACCESS); - } - /* if data was given, unpack and store it in the pmix dstore - it is okay * if there was no data, it's just a fence */ cnt = 1; @@ -348,8 +333,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) /* yes - deliver a copy */ reply = OBJ_NEW(opal_buffer_t); if (NULL == req->proxy) { - /* get the proc object for the peer */ - proc_peer = orte_get_proc_object(&req->peer->name); /* pack the status */ ret = OPAL_SUCCESS; if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { @@ -358,123 +341,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_RELEASE(sig); return; } - /* check if the peer has an access to shared memory dstore segment */ - if (!ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - /* always pass the hostname */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(PMIX_HOSTNAME); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.nodename); - kp = &kv; - if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&kv); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&kv); - /* pack the blob */ - bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&buf); - /* pass the local blob(s) */ - opal_dss.copy_payload(reply, &blocal); - } else { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(id, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(sig); - return; - } - if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM)) { - data = OBJ_NEW(opal_buffer_t); - /* always pass the hostname */ - OBJ_CONSTRUCT(&buf, opal_buffer_t); - OBJ_CONSTRUCT(&kv, opal_value_t); - kv.key = strdup(PMIX_HOSTNAME); - kv.type = OPAL_STRING; - kv.data.string = strdup(orte_process_info.nodename); - kp = &kv; - if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&buf); - OBJ_DESTRUCT(&kv); - OBJ_RELEASE(sig); - OBJ_DESTRUCT(&xfer); - return; - } - OBJ_DESTRUCT(&kv); - /* pack the blob */ - bptr = &buf; - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&buf); - /* pass the local blob(s) */ - opal_dss.copy_payload(data, &blocal); - opal_value_t kvp; - OBJ_CONSTRUCT(&kvp, opal_value_t); - kvp.key = strdup("finalval"); - kvp.type = OPAL_BYTE_OBJECT; - kvp.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvp.data.bo.size = data->bytes_used; - kvp.data.bo.bytes = NULL; - kvp.data.bo.size = 0; - /* store data in the shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &id, &kvp))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&kvp); - OBJ_RELEASE(sig); - return; - } - OBJ_DESTRUCT(&kvp); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - } - } - /* use the PMIX send to return the data */ - PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); - } else { - /* pack the id of the requested proc */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(sig); - return; - } - /* pack the status */ - ret = OPAL_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - return; - } /* always pass the hostname */ OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&kv, opal_value_t); @@ -487,29 +353,76 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_RELEASE(reply); OBJ_DESTRUCT(&buf); OBJ_DESTRUCT(&kv); + OBJ_RELEASE(sig); return; } OBJ_DESTRUCT(&kv); - /* pack the hostname blob */ + /* pack the blob */ bptr = &buf; if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(reply); OBJ_DESTRUCT(&xfer); OBJ_DESTRUCT(&buf); + OBJ_RELEASE(sig); return; } OBJ_DESTRUCT(&buf); - /* pass the remote blob(s) */ - opal_dss.copy_payload(reply, &bremote); - /* use RML to send the response */ - orte_rml.send_buffer_nb(&req->proxy->name, reply, - ORTE_RML_TAG_DIRECT_MODEX_RESP, - orte_rml_send_callback, NULL); + /* pass the local blob(s) */ + opal_dss.copy_payload(reply, &blocal); + } + /* use the PMIX send to return the data */ + PMIX_SERVER_QUEUE_SEND(req->peer, req->tag, reply); + } else { + /* pack the id of the requested proc */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &id, 1, OPAL_NAME))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_RELEASE(sig); + return; + } + /* pack the status */ + ret = OPAL_SUCCESS; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + return; + } + /* always pass the hostname */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + OBJ_CONSTRUCT(&kv, opal_value_t); + kv.key = strdup(PMIX_HOSTNAME); + kv.type = OPAL_STRING; + kv.data.string = strdup(orte_process_info.nodename); + kp = &kv; + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &kp, 1, OPAL_VALUE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&buf); + OBJ_DESTRUCT(&kv); + return; + } + OBJ_DESTRUCT(&kv); + /* pack the hostname blob */ + bptr = &buf; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; } - opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); - OBJ_RELEASE(req); + OBJ_DESTRUCT(&buf); + /* pass the remote blob(s) */ + opal_dss.copy_payload(reply, &bremote); + /* use RML to send the response */ + orte_rml.send_buffer_nb(&req->proxy->name, reply, + ORTE_RML_TAG_DIRECT_MODEX_RESP, + orte_rml_send_callback, NULL); } + opal_list_remove_item(&pmix_server_pending_dmx_reqs, &req->super); + OBJ_RELEASE(req); } OBJ_DESTRUCT(&blocal); OBJ_DESTRUCT(&bremote); @@ -583,7 +496,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); /* get the proc object for the peer */ memcpy((char*)&name, (char*)&id, sizeof(orte_process_name_t)); - proc_peer = orte_get_proc_object(&name); /* unpack the id of the proc whose data is being requested */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(&xfer, &idreq, &cnt, OPAL_NAME))) { @@ -609,34 +521,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) return; } - sm_flag = 0; - if (ORTE_FLAG_TEST(proc_peer, ORTE_PROC_FLAG_SM_ACCESS)) { - sm_flag = 1; - } - /* if we have already stored data for this proc in shared memory region, - * then we just need to send a response */ - if (1 == sm_flag && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_IN_SM) && ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { - reply = OBJ_NEW(opal_buffer_t); - ret = OPAL_SUCCESS; - /* pack the error status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(reply); - return; - } - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_DESTRUCT(&xfer); - OBJ_RELEASE(reply); - return; - } - PMIX_SERVER_QUEUE_SEND(peer, tag, reply); - OBJ_DESTRUCT(&xfer); - return; - } /* if we have not yet received data for this proc, then we just * need to track the request */ if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_DATA_RECVD)) { @@ -710,7 +594,6 @@ void pmix_server_process_message(pmix_server_peer_t *peer) ORTE_NAME_PRINT(&name)); kvp = NULL; kvp2 = NULL; - data = NULL; /* retrieve the local blob for that proc */ OBJ_CONSTRUCT(&values, opal_list_t); if (OPAL_SUCCESS == (ret = opal_dstore.fetch(pmix_server_local_handle, &idreq, "modex", &values))) { @@ -750,28 +633,13 @@ void pmix_server_process_message(pmix_server_peer_t *peer) OBJ_DESTRUCT(&kv); /* pack the blob */ bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - return; - } + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + return; } - OBJ_DESTRUCT(&buf); /* local blob */ if (NULL != kvp) { @@ -787,29 +655,13 @@ void pmix_server_process_message(pmix_server_peer_t *peer) kvp->data.bo.bytes = NULL; kvp->data.bo.size = 0; bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp); - return; - } + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_DESTRUCT(&xfer); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp); + return; } OBJ_DESTRUCT(&buf); OBJ_RELEASE(kvp); @@ -828,66 +680,16 @@ void pmix_server_process_message(pmix_server_peer_t *peer) kvp2->data.bo.bytes = NULL; kvp2->data.bo.size = 0; bptr = &buf; - if (0 == sm_flag) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - return; - } - } else { - if (NULL == data) { - data = OBJ_NEW(opal_buffer_t); - } - if (OPAL_SUCCESS != (rc = opal_dss.pack(data, &bptr, 1, OPAL_BUFFER))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - return; - } - } - OBJ_DESTRUCT(&buf); - OBJ_RELEASE(kvp2); - } - if (1 == sm_flag) { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&xfer); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - /* store data in the shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &bptr, 1, OPAL_BUFFER))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(reply); - OBJ_RELEASE(data); OBJ_DESTRUCT(&xfer); - OBJ_DESTRUCT(&kvf); + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); return; } - /* protect the data */ - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - + OBJ_DESTRUCT(&buf); + OBJ_RELEASE(kvp2); } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); OBJ_DESTRUCT(&xfer); @@ -929,44 +731,8 @@ void pmix_server_process_message(pmix_server_peer_t *peer) /* xfer the data - the blobs are in the buffer, * so don't repack them. They will include the remote * hostname, so don't add it again */ - if (0 == sm_flag) { - opal_dss.copy_payload(reply, &buf); - } else { - data = OBJ_NEW(opal_buffer_t); - opal_dss.copy_payload(data, &buf); - } + opal_dss.copy_payload(reply, &buf); OBJ_DESTRUCT(&buf); - if (1 == sm_flag) { - /* pack reply: info about meta segment for the target process */ - rc = pack_segment_info(idreq, reply); - if (OPAL_SUCCESS != rc) { - OPAL_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - return; - } - opal_value_t kvf; - OBJ_CONSTRUCT(&kvf, opal_value_t); - kvf.key = strdup("finalval"); - kvf.type = OPAL_BYTE_OBJECT; - kvf.data.bo.bytes = (uint8_t*)(data->base_ptr); - kvf.data.bo.size = data->bytes_used; - /* store data into shared memory dstore segment */ - if (OPAL_SUCCESS != (rc = opal_dstore.store(opal_dstore_modex, &idreq, &kvf))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(data); - OBJ_DESTRUCT(&kvf); - return; - } - /* protect the data */ - kvf.data.bo.bytes = NULL; - kvf.data.bo.size = 0; - OBJ_DESTRUCT(&kvf); - /* mark that we put data for this proc to shared memory region */ - ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_DATA_IN_SM); - OBJ_RELEASE(data); - } PMIX_SERVER_QUEUE_SEND(peer, tag, reply); return; } diff --git a/orte/runtime/orte_finalize.c b/orte/runtime/orte_finalize.c index 3a164b6fa3b..9ea60dee211 100644 --- a/orte/runtime/orte_finalize.c +++ b/orte/runtime/orte_finalize.c @@ -12,7 +12,7 @@ * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. * All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -33,6 +33,7 @@ #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_locks.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/show_help.h" @@ -58,6 +59,12 @@ int orte_finalize(void) /* flag that we are finalizing */ orte_finalizing = true; + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { + /* stop listening for connections - will + * be ignored if no listeners were registered */ + orte_stop_listening(); + } + /* flush the show_help system */ orte_show_help_finalize(); diff --git a/orte/runtime/orte_init.c b/orte/runtime/orte_init.c index 11d2cbb3cb5..b2e12104785 100644 --- a/orte/runtime/orte_init.c +++ b/orte/runtime/orte_init.c @@ -13,7 +13,7 @@ * reserved. * Copyright (c) 2007-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2007-2008 Sun Microsystems, Inc. All rights reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. * Copyright (c) 2014-2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * @@ -46,6 +46,7 @@ #include "orte/mca/ess/base/base.h" #include "orte/mca/ess/ess.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/util/listener.h" #include "orte/util/name_fns.h" #include "orte/util/proc_info.h" #include "orte/util/error_strings.h" @@ -203,14 +204,6 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags) goto error; } - if (ORTE_PROC_IS_APP) { - if (0 > (opal_dstore_modex = opal_dstore.open("MODEX", "sm,hash", NULL))) { - error = "opal dstore modex"; - ret = ORTE_ERR_FATAL; - goto error; - } - } - if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { /* let the pmix server register params */ pmix_server_register(); @@ -258,6 +251,16 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags) opal_timing_set_jobid(ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); #endif + if (ORTE_PROC_IS_HNP || ORTE_PROC_IS_DAEMON) { + /* start listening - will be ignored if no listeners + * were registered */ + if (ORTE_SUCCESS != (ret = orte_start_listening())) { + ORTE_ERROR_LOG(ret); + error = "orte_start_listening"; + goto error; + } + } + /* All done */ return ORTE_SUCCESS; diff --git a/orte/util/Makefile.am b/orte/util/Makefile.am index d2b34893536..55ed763a12c 100644 --- a/orte/util/Makefile.am +++ b/orte/util/Makefile.am @@ -11,7 +11,7 @@ # All rights reserved. # Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. # Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. -# Copyright (c) 2014 Intel, Inc. All rights reserved. +# Copyright (c) 2014-2015 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -56,7 +56,8 @@ headers += \ util/comm/comm.h \ util/nidmap.h \ util/regex.h \ - util/attr.h + util/attr.h \ + util/listener.h lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \ util/error_strings.c \ @@ -74,7 +75,8 @@ lib@ORTE_LIB_PREFIX@open_rte_la_SOURCES += \ util/comm/comm.c \ util/nidmap.c \ util/regex.c \ - util/attr.c + util/attr.c \ + util/listener.c # Remove the generated man pages distclean-local: diff --git a/orte/util/listener.c b/orte/util/listener.c new file mode 100644 index 00000000000..ed66d550238 --- /dev/null +++ b/orte/util/listener.c @@ -0,0 +1,381 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * In windows, many of the socket functions return an EWOULDBLOCK + * instead of things like EAGAIN, EINPROGRESS, etc. It has been + * verified that this will not conflict with other error codes that + * are returned by these functions under UNIX/Linux environments + */ + +#include "orte_config.h" +#include "orte/types.h" +#include "opal/types.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETDB_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include + +#include "opal/util/error.h" +#include "opal/util/output.h" +#include "opal/opal_socket_errno.h" +#include "opal/util/if.h" +#include "opal/util/net.h" +#include "opal/util/fd.h" +#include "opal/class/opal_list.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/util/name_fns.h" +#include "orte/runtime/orte_globals.h" +#include "orte/util/show_help.h" + +#include "orte/util/listener.h" + +static void* listen_thread_fn(opal_object_t *obj); +static opal_list_t mylisteners; +static bool initialized = false; +static opal_thread_t listen_thread; +static volatile bool listen_thread_active = false; +static struct timeval listen_thread_tv; +static int stop_thread[2]; + +#define CLOSE_THE_SOCKET(socket) \ + do { \ + shutdown(socket, 2); \ + close(socket); \ + socket = -1; \ + } while(0) + + +int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen, + opal_event_base_t *evbase, + orte_listener_callback_fn_t handler) +{ + orte_listener_t *conn; + int flags; + int sd = -1; + + if (!initialized) { + OBJ_CONSTRUCT(&mylisteners, opal_list_t); + OBJ_CONSTRUCT(&listen_thread, opal_thread_t); + if (0 > pipe(stop_thread)) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + /* Make sure the pipe FDs are set to close-on-exec so that + they don't leak into children */ + if (opal_fd_set_cloexec(stop_thread[0]) != OPAL_SUCCESS || + opal_fd_set_cloexec(stop_thread[1]) != OPAL_SUCCESS) { + close(stop_thread[0]); + close(stop_thread[1]); + ORTE_ERROR_LOG(ORTE_ERR_IN_ERRNO); + return ORTE_ERR_IN_ERRNO; + } + listen_thread_tv.tv_sec = 3600; + listen_thread_tv.tv_usec = 0; + initialized = true; + } + + /* create a listen socket for incoming connection attempts */ + sd = socket(PF_UNIX, SOCK_STREAM, 0); + if (sd < 0) { + if (EAFNOSUPPORT != opal_socket_errno) { + opal_output(0,"pmix_server_start_listening: socket() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + } + return ORTE_ERR_IN_ERRNO; + } + /* Set the socket to close-on-exec so that no children inherit + this FD */ + if (opal_fd_set_cloexec(sd) != OPAL_SUCCESS) { + opal_output(0, "pmix_server: unable to set the " + "listening socket to CLOEXEC (%s:%d)\n", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + + if (bind(sd, (struct sockaddr*)address, addrlen) < 0) { + opal_output(0, "%s bind() failed on error %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + strerror(opal_socket_errno), + opal_socket_errno ); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* setup listen backlog to maximum allowed by kernel */ + if (listen(sd, SOMAXCONN) < 0) { + opal_output(0, "orte_listener: listen() failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* set socket up to be non-blocking, otherwise accept could block */ + if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { + opal_output(0, "orte_listener: fcntl(F_GETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + flags |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, flags) < 0) { + opal_output(0, "orte_listener: fcntl(F_SETFL) failed: %s (%d)", + strerror(opal_socket_errno), opal_socket_errno); + CLOSE_THE_SOCKET(sd); + return ORTE_ERROR; + } + + /* add this port to our connections */ + conn = OBJ_NEW(orte_listener_t); + conn->sd = sd; + conn->evbase = evbase; + conn->handler = handler; + opal_list_append(&mylisteners, &conn->item); + + return ORTE_SUCCESS; +} + +/* + * Component initialization - create a module for each available + * TCP interface and initialize the static resources associated + * with that module. + * + * Also initializes the list of devices that will be used/supported by + * the module, using the if_include and if_exclude variables. This is + * the only place that this sorting should occur -- all other places + * should use the tcp_avaiable_devices list. This is a change from + * previous versions of this component. + */ +int orte_start_listening(void) +{ + int rc; + + /* if we aren't initialized, or have nothing + * registered, or are already listening, then return SUCCESS */ + if (!initialized || 0 == opal_list_get_size(&mylisteners) || + listen_thread_active) { + return ORTE_SUCCESS; + } + + /* start our listener thread */ + listen_thread_active = true; + listen_thread.t_run = listen_thread_fn; + listen_thread.t_arg = NULL; + if (OPAL_SUCCESS != (rc = opal_thread_start(&listen_thread))) { + ORTE_ERROR_LOG(rc); + opal_output(0, "%s Unable to start listen thread", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + return rc; +} + +void orte_stop_listening(void) +{ + int i=0; + + if (!listen_thread_active) { + return; + } + + listen_thread_active = false; + /* tell the thread to exit */ + write(stop_thread[1], &i, sizeof(int)); + opal_thread_join(&listen_thread, NULL); + OBJ_DESTRUCT(&listen_thread); + OPAL_LIST_DESTRUCT(&mylisteners); +} + +/* + * The listen thread accepts incoming connections and places them + * in a queue for further processing + * + * Runs until orte_listener_shutdown is set to true. + */ +static void* listen_thread_fn(opal_object_t *obj) +{ + int rc, max, accepted_connections, sd; + opal_socklen_t addrlen = sizeof(struct sockaddr_storage); + orte_pending_connection_t *pending_connection; + struct timeval timeout; + fd_set readfds; + orte_listener_t *listener; + + while (listen_thread_active) { + FD_ZERO(&readfds); + max = -1; + OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) { + FD_SET(listener->sd, &readfds); + max = (listener->sd > max) ? listener->sd : max; + } + /* add the stop_thread fd */ + FD_SET(stop_thread[0], &readfds); + max = (stop_thread[0] > max) ? stop_thread[0] : max; + + /* set timeout interval */ + timeout.tv_sec = listen_thread_tv.tv_sec; + timeout.tv_usec = listen_thread_tv.tv_usec; + + /* Block in a select to avoid hammering the cpu. If a connection + * comes in, we'll get woken up right away. + */ + rc = select(max + 1, &readfds, NULL, NULL, &timeout); + if (!listen_thread_active) { + /* we've been asked to terminate */ + goto done; + } + if (rc < 0) { + if (EAGAIN != opal_socket_errno && EINTR != opal_socket_errno) { + perror("select"); + } + continue; + } + + /* Spin accepting connections until all active listen sockets + * do not have any incoming connections, pushing each connection + * onto its respective event queue for processing + */ + do { + accepted_connections = 0; + OPAL_LIST_FOREACH(listener, &mylisteners, orte_listener_t) { + sd = listener->sd; + + /* according to the man pages, select replaces the given descriptor + * set with a subset consisting of those descriptors that are ready + * for the specified operation - in this case, a read. So we need to + * first check to see if this file descriptor is included in the + * returned subset + */ + if (0 == FD_ISSET(sd, &readfds)) { + /* this descriptor is not included */ + continue; + } + + /* this descriptor is ready to be read, which means a connection + * request has been received - so harvest it. All we want to do + * here is accept the connection and push the info onto the event + * library for subsequent processing - we don't want to actually + * process the connection here as it takes too long, and so the + * OS might start rejecting connections due to timeout. + */ + pending_connection = OBJ_NEW(orte_pending_connection_t); + opal_event_set(listener->evbase, &pending_connection->ev, -1, + OPAL_EV_WRITE, listener->handler, pending_connection); + opal_event_set_priority(&pending_connection->ev, ORTE_MSG_PRI); + pending_connection->fd = accept(sd, + (struct sockaddr*)&(pending_connection->addr), + &addrlen); + if (pending_connection->fd < 0) { + OBJ_RELEASE(pending_connection); + + /* Non-fatal errors */ + if (EAGAIN == opal_socket_errno || + EWOULDBLOCK == opal_socket_errno) { + continue; + } + + /* If we run out of file descriptors, log an extra + warning (so that the user can know to fix this + problem) and abandon all hope. */ + else if (EMFILE == opal_socket_errno) { + CLOSE_THE_SOCKET(sd); + ORTE_ERROR_LOG(ORTE_ERR_SYS_LIMITS_SOCKETS); + orte_show_help("help-oob-tcp.txt", + "accept failed", + true, + opal_process_info.nodename, + opal_socket_errno, + strerror(opal_socket_errno), + "Out of file descriptors"); + goto done; + } + + /* For all other cases, close the socket, print a + warning but try to continue */ + else { + CLOSE_THE_SOCKET(sd); + orte_show_help("help-oob-tcp.txt", + "accept failed", + true, + opal_process_info.nodename, + opal_socket_errno, + strerror(opal_socket_errno), + "Unknown cause; job will try to continue"); + continue; + } + } + + /* activate the event */ + opal_event_active(&pending_connection->ev, OPAL_EV_WRITE, 1); + accepted_connections++; + } + } while (accepted_connections > 0); + } + + done: + close(stop_thread[0]); + close(stop_thread[1]); + return NULL; +} + + +/* INSTANTIATE CLASSES */ +static void lcons(orte_listener_t *p) +{ + p->sd = -1; + p->evbase = NULL; + p->handler = NULL; +} +static void ldes(orte_listener_t *p) +{ + if (0 <= p->sd) { + CLOSE_THE_SOCKET(p->sd); + } +} +OBJ_CLASS_INSTANCE(orte_listener_t, + opal_list_item_t, + lcons, ldes); + +OBJ_CLASS_INSTANCE(orte_pending_connection_t, + opal_object_t, + NULL, + NULL); diff --git a/orte/util/listener.h b/orte/util/listener.h new file mode 100644 index 00000000000..010b26786bf --- /dev/null +++ b/orte/util/listener.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef ORTE_LISTENER_H +#define ORTE_LISTENER_H + +#include "orte_config.h" + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include "opal/class/opal_list.h" +#include "opal/mca/event/event.h" + +/* callback prototype */ +typedef void (*orte_listener_callback_fn_t)(int sd, short args, void *cbdata); + +/* + * Data structure for accepting connections. + */ +typedef struct orte_listener_t { + opal_list_item_t item; + int sd; + opal_event_base_t *evbase; + orte_listener_callback_fn_t handler; +} orte_listener_t; +OBJ_CLASS_DECLARATION(orte_listener_t); + +typedef struct { + opal_object_t super; + opal_event_t ev; + int fd; + struct sockaddr_storage addr; +} orte_pending_connection_t; +OBJ_CLASS_DECLARATION(orte_pending_connection_t); + +ORTE_DECLSPEC int orte_start_listening(void); +ORTE_DECLSPEC void orte_stop_listening(void); +ORTE_DECLSPEC int orte_register_listener(struct sockaddr* address, opal_socklen_t addrlen, + opal_event_base_t *evbase, + orte_listener_callback_fn_t handler); + +#endif /* ORTE_LISTENER_H */