Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions orte/mca/iof/base/iof_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ static void orte_iof_base_sink_construct(orte_iof_sink_t* ptr)
static void orte_iof_base_sink_destruct(orte_iof_sink_t* ptr)
{
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof: closing sink for process %s",
"%s iof: closing sink for process %s on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&ptr->name)));
ORTE_NAME_PRINT(&ptr->name), ptr->wev->fd));
if (NULL != ptr->wev && 0 <= ptr->wev->fd) {
OBJ_RELEASE(ptr->wev);
}
Expand Down Expand Up @@ -303,7 +303,6 @@ static void orte_iof_base_write_event_destruct(orte_iof_write_event_t* wev)
return;
}
}

if (2 < wev->fd) {
OPAL_OUTPUT_VERBOSE((20, orte_iof_base_framework.framework_output,
"%s iof: closing fd %d for write event",
Expand Down
19 changes: 18 additions & 1 deletion orte/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static int hnp_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);

static void hnp_complete(const orte_job_t *jdata);

static int finalize(void);

static int hnp_ft_event(int state);
Expand All @@ -88,6 +90,7 @@ orte_iof_base_module_t orte_iof_hnp_module = {
.pull = hnp_pull,
.close = hnp_close,
.output = hnp_output,
.complete = hnp_complete,
.finalize = finalize,
.ft_event = hnp_ft_event
};
Expand Down Expand Up @@ -176,7 +179,8 @@ static int hnp_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_tag,
return ORTE_ERR_NOT_FOUND;
}
/* setup any requested output files */
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct, &stdoutsink, &stderrsink, &stddiagsink))) {
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jdata, proct,
&stdoutsink, &stderrsink, &stddiagsink))) {
ORTE_ERROR_LOG(rc);
return rc;
}
Expand Down Expand Up @@ -422,6 +426,19 @@ static int hnp_close(const orte_process_name_t* peer,
return ORTE_SUCCESS;
}

static void hnp_complete(const orte_job_t *jdata)
{
orte_iof_proc_t *proct, *next;

/* cleanout any lingering sinks */
OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_hnp_component.procs, orte_iof_proc_t) {
if (jdata->jobid == proct->name.jobid) {
opal_list_remove_item(&mca_iof_hnp_component.procs, &proct->super);
OBJ_RELEASE(proct);
}
}
}

static int finalize(void)
{
orte_iof_write_event_t *wev;
Expand Down
21 changes: 19 additions & 2 deletions orte/mca/iof/orted/iof_orted.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ static int orted_output(const orte_process_name_t* peer,
orte_iof_tag_t source_tag,
const char *msg);

static void orted_complete(const orte_job_t *jdata);

static int finalize(void);

static int orted_ft_event(int state);
Expand All @@ -91,6 +93,7 @@ orte_iof_base_module_t orte_iof_orted_module = {
.pull = orted_pull,
.close = orted_close,
.output = orted_output,
.complete = orted_complete,
.finalize = finalize,
.ft_event = orted_ft_event
};
Expand Down Expand Up @@ -126,7 +129,7 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
orte_job_t *jobdat=NULL;
orte_ns_cmp_bitmask_t mask;

OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
OPAL_OUTPUT_VERBOSE((1, orte_iof_base_framework.framework_output,
"%s iof:orted pushing fd %d for process %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
fd, ORTE_NAME_PRINT(dst_name)));
Expand Down Expand Up @@ -164,7 +167,8 @@ static int orted_push(const orte_process_name_t* dst_name, orte_iof_tag_t src_ta
return ORTE_ERR_NOT_FOUND;
}
/* setup any requested output files */
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct, &stdoutsink, &stderrsink, &stddiagsink))) {
if (ORTE_SUCCESS != (rc = orte_iof_base_setup_output_files(dst_name, jobdat, proct,
&stdoutsink, &stderrsink, &stddiagsink))) {
ORTE_ERROR_LOG(rc);
return rc;
}
Expand Down Expand Up @@ -312,6 +316,19 @@ static int orted_close(const orte_process_name_t* peer,
return ORTE_SUCCESS;
}

static void orted_complete(const orte_job_t *jdata)
{
orte_iof_proc_t *proct, *next;

/* cleanout any lingering sinks */
OPAL_LIST_FOREACH_SAFE(proct, next, &mca_iof_orted_component.procs, orte_iof_proc_t) {
if (jdata->jobid == proct->name.jobid) {
opal_list_remove_item(&mca_iof_orted_component.procs, &proct->super);
OBJ_RELEASE(proct);
}
}
}

static int finalize(void)
{
orte_iof_proc_t *proct;
Expand Down
15 changes: 14 additions & 1 deletion orte/mca/state/orted/state_orted.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "opal/util/output.h"
#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix.h"

#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/base/base.h"
Expand Down Expand Up @@ -440,11 +441,23 @@ static void track_procs(int fd, short argc, void *cbdata)
OBJ_RELEASE(pptr); // maintain accounting
}
}
/* tell the IOF that the job is complete */
if (NULL != orte_iof.complete) {
orte_iof.complete(jdata);
}

/* tell the PMIx subsystem the job is complete */
if (NULL != opal_pmix.server_deregister_nspace) {
opal_pmix.server_deregister_nspace(jdata->jobid, NULL, NULL);
}

/* cleanup the job info */
opal_hash_table_set_value_uint32(orte_job_data, jdata->jobid, NULL);
OBJ_RELEASE(jdata);
}
}

cleanup:
cleanup:
OBJ_RELEASE(caddy);
}

Expand Down