diff --git a/opal/dss/dss_copy.c b/opal/dss/dss_copy.c index 839ddc648b9..a39798bd46a 100644 --- a/opal/dss/dss_copy.c +++ b/opal/dss/dss_copy.c @@ -219,6 +219,7 @@ int opal_dss_copy_pstat(opal_pstats_t **dest, opal_pstats_t *src, p->time = src->time; p->priority = src->priority; p->num_threads = src->num_threads; + p->pss = src->pss; p->vsize = src->vsize; p->rss = src->rss; p->peak_vsize = src->peak_vsize; diff --git a/opal/dss/dss_open_close.c b/opal/dss/dss_open_close.c index 366cf2586a9..baf58143efe 100644 --- a/opal/dss/dss_open_close.c +++ b/opal/dss/dss_open_close.c @@ -156,6 +156,7 @@ static void opal_pstat_construct(opal_pstats_t *obj) obj->time.tv_usec = 0; obj->priority = -1; obj->num_threads = -1; + obj->pss = 0.0; obj->vsize = 0.0; obj->rss = 0.0; obj->peak_vsize = 0.0; diff --git a/opal/dss/dss_pack.c b/opal/dss/dss_pack.c index cd7b035bb18..23c9d3b31bc 100644 --- a/opal/dss/dss_pack.c +++ b/opal/dss/dss_pack.c @@ -499,6 +499,9 @@ int opal_dss_pack_pstat(opal_buffer_t *buffer, const void *src, if (OPAL_SUCCESS != (ret = opal_dss_pack_buffer(buffer, &ptr[i]->num_threads, 1, OPAL_INT16))) { return ret; } + if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->pss, 1, OPAL_FLOAT))) { + return ret; + } if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->vsize, 1, OPAL_FLOAT))) { return ret; } diff --git a/opal/dss/dss_print.c b/opal/dss/dss_print.c index e95a40f8141..98deb25fe70 100644 --- a/opal/dss/dss_print.c +++ b/opal/dss/dss_print.c @@ -654,10 +654,10 @@ int opal_dss_print_pstat(char **output, char *prefix, opal_pstats_t *src, opal_d return OPAL_SUCCESS; } asprintf(output, "%sOPAL_PSTATS SAMPLED AT: %ld.%06ld\n%snode: %s rank: %d pid: %d cmd: %s state: %c pri: %d #threads: %d Processor: %d\n" - "%s\ttime: %ld.%06ld cpu: %5.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n", + "%s\ttime: %ld.%06ld cpu: %5.2f PSS: %8.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n", prefx, (long)src->sample_time.tv_sec, (long)src->sample_time.tv_usec, prefx, src->node, src->rank, src->pid, src->cmd, src->state[0], src->priority, src->num_threads, src->processor, - prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->vsize, src->peak_vsize, src->rss); + prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->pss, src->vsize, src->peak_vsize, src->rss); if (prefx != prefix) { free(prefx); } diff --git a/opal/dss/dss_types.h b/opal/dss/dss_types.h index d3096964e25..23d2f08dcae 100644 --- a/opal/dss/dss_types.h +++ b/opal/dss/dss_types.h @@ -182,6 +182,7 @@ typedef struct { float percent_cpu; int32_t priority; int16_t num_threads; + float pss; /* in MBytes */ float vsize; /* in MBytes */ float rss; /* in MBytes */ float peak_vsize; /* in MBytes */ diff --git a/opal/dss/dss_unpack.c b/opal/dss/dss_unpack.c index 9e86ec98b88..be9993983cd 100644 --- a/opal/dss/dss_unpack.c +++ b/opal/dss/dss_unpack.c @@ -643,6 +643,11 @@ int opal_dss_unpack_pstat(opal_buffer_t *buffer, void *dest, return ret; } m=1; + if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->pss, &m, OPAL_FLOAT))) { + OPAL_ERROR_LOG(ret); + return ret; + } + m=1; if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->vsize, &m, OPAL_FLOAT))) { OPAL_ERROR_LOG(ret); return ret; diff --git a/opal/mca/pstat/linux/pstat_linux_module.c b/opal/mca/pstat/linux/pstat_linux_module.c index 0ae3da6a33e..41aa9b0f5d1 100644 --- a/opal/mca/pstat/linux/pstat_linux_module.c +++ b/opal/mca/pstat/linux/pstat_linux_module.c @@ -310,6 +310,31 @@ static int query(pid_t pid, } } fclose(fp); + + /* now create the smaps filename for this proc */ + memset(data, 0, sizeof(data)); + numchars = snprintf(data, sizeof(data), "/proc/%d/smaps", pid); + if (numchars >= sizeof(data)) { + return OPAL_ERR_VALUE_OUT_OF_BOUNDS; + } + + if (NULL == (fp = fopen(data, "r"))) { + /* ignore this */ + return OPAL_SUCCESS; + } + + /* parse it to find lines that start with "Pss" */ + while (NULL != (dptr = local_getline(fp))) { + if (NULL == (value = local_stripper(dptr))) { + /* cannot process */ + continue; + } + /* look for Pss */ + if (0 == strncmp(dptr, "Pss", strlen("Pss"))) { + stats->pss += convert_value(value); + } + } + fclose(fp); } if (NULL != nstats) { diff --git a/orte/mca/odls/odls_types.h b/orte/mca/odls/odls_types.h index 51c9afc3634..436fc31f75a 100644 --- a/orte/mca/odls/odls_types.h +++ b/orte/mca/odls/odls_types.h @@ -83,6 +83,9 @@ typedef uint8_t orte_daemon_cmd_flag_t; /* for debug purposes, get stack traces from all application procs */ #define ORTE_DAEMON_GET_STACK_TRACES (orte_daemon_cmd_flag_t) 31 +/* for memory profiling */ +#define ORTE_DAEMON_GET_MEMPROFILE (orte_daemon_cmd_flag_t) 32 + /* * Struct written up the pipe from the child to the parent. */ diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index a8714ad24a3..c021db97782 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -166,6 +166,9 @@ BEGIN_C_DECLS /* stacktrace for debug */ #define ORTE_RML_TAG_STACK_TRACE 60 +/* memory profile */ +#define ORTE_RML_TAG_MEMPROFILE 61 + #define ORTE_RML_TAG_MAX 100 #define ORTE_RML_TAG_NTOH(t) ntohl(t) diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 64146d5cde1..35e75e56af7 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -45,6 +45,7 @@ #include "opal/mca/event/event.h" #include "opal/mca/base/base.h" +#include "opal/mca/pstat/pstat.h" #include "opal/util/output.h" #include "opal/util/opal_environ.h" #include "opal/util/path.h" @@ -115,6 +116,8 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, FILE *fp; char gscmd[256], path[1035], *pathptr; char string[256], *string_ptr = string; + float pss; + opal_pstats_t pstat; /* unpack the command */ n = 1; @@ -1151,6 +1154,44 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, } break; + case ORTE_DAEMON_GET_MEMPROFILE: + answer = OBJ_NEW(opal_buffer_t); + /* pack our hostname so they know where it came from */ + opal_dss.pack(answer, &orte_process_info.nodename, 1, OPAL_STRING); + /* collect my memory usage */ + OBJ_CONSTRUCT(&pstat, opal_pstats_t); + opal_pstat.query(orte_process_info.pid, &pstat, NULL); + opal_dss.pack(answer, &pstat.pss, 1, OPAL_FLOAT); + OBJ_DESTRUCT(&pstat); + /* collect the memory usage of all my children */ + pss = 0.0; + num_replies = 0; + for (i=0; i < orte_local_children->size; i++) { + if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) && + ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) { + /* collect the stats on this proc */ + OBJ_CONSTRUCT(&pstat, opal_pstats_t); + if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) { + pss += pstat.pss; + ++num_replies; + } + OBJ_DESTRUCT(&pstat); + } + } + /* compute the average value */ + if (0 < num_replies) { + pss /= (float)num_replies; + } + opal_dss.pack(answer, &pss, 1, OPAL_FLOAT); + /* send it back */ + if (0 > (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, answer, + ORTE_RML_TAG_MEMPROFILE, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(answer); + } + break; + default: ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); } @@ -1222,6 +1263,9 @@ static char *get_orted_comm_cmd_str(int command) case ORTE_DAEMON_GET_STACK_TRACES: return strdup("ORTE_DAEMON_GET_STACK_TRACES"); + case ORTE_DAEMON_GET_MEMPROFILE: + return strdup("ORTE_DAEMON_GET_MEMPROFILE"); + default: return strdup("Unknown Command!"); } diff --git a/orte/orted/orted_submit.c b/orte/orted/orted_submit.c index ec5ad04b6ff..f7bf6281d4b 100644 --- a/orte/orted/orted_submit.c +++ b/orte/orted/orted_submit.c @@ -117,6 +117,7 @@ static orte_std_cntr_t total_num_apps = 0; static bool want_prefix_by_default = (bool) ORTE_WANT_ORTERUN_PREFIX_BY_DEFAULT; static opal_pointer_array_t tool_jobs; static int timeout_seconds; +static orte_timer_t *orte_memprofile_timeout; int orte_debugger_attach_fd = -1; bool orte_debugger_fifo_active=false; @@ -135,6 +136,7 @@ static int parse_locals(orte_job_t *jdata, int argc, char* argv[]); static void set_classpath_jar_file(orte_app_context_t *app, int index, char *jarfile); static int parse_appfile(orte_job_t *jdata, char *filename, char ***env); static void orte_timeout_wakeup(int sd, short args, void *cbdata); +static void orte_profile_wakeup(int sd, short args, void *cbdata); static void launch_recv(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata); @@ -902,6 +904,21 @@ int orte_submit_job(char *argv[], int *index, opal_event_evtimer_add(orte_mpiexec_timeout->ev, &orte_mpiexec_timeout->tv); } + /* check for diagnostic memory profile */ + if (NULL != (param = getenv("OMPI_MEMPROFILE"))) { + timeout_seconds = strtol(param, NULL, 10); + if (NULL == (orte_memprofile_timeout = OBJ_NEW(orte_timer_t))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + ORTE_UPDATE_EXIT_STATUS(ORTE_ERR_OUT_OF_RESOURCE); + //goto DONE; + } + orte_memprofile_timeout->tv.tv_sec = timeout_seconds; + orte_memprofile_timeout->tv.tv_usec = 0; + opal_event_evtimer_set(orte_event_base, orte_memprofile_timeout->ev, + orte_profile_wakeup, jdata); + opal_event_set_priority(orte_memprofile_timeout->ev, ORTE_ERROR_PRI); + opal_event_evtimer_add(orte_memprofile_timeout->ev, &orte_memprofile_timeout->tv); + } if (ORTE_PROC_IS_HNP) { /* get the daemon job object */ daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); @@ -3055,3 +3072,128 @@ void orte_timeout_wakeup(int sd, short args, void *cbdata) /* set the global abnormal exit flag */ orte_abnormal_term_ordered = true; } + +static int nreports = 0; +static orte_timer_t profile_timer; + +static void profile_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int32_t cnt; + char *hostname; + float dpss, pss; + + /* unpack the hostname where this came from */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING)) { + goto done; + } + /* print the hostname */ + fprintf(stderr, "Memory profile from host: %s\n", hostname); + free(hostname); + + /* get the PSS of the daemon */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &dpss, &cnt, OPAL_FLOAT)) { + goto done; + } + /* get the average PSS of the child procs */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &pss, &cnt, OPAL_FLOAT)) { + goto done; + } + + fprintf(stderr, "\tDaemon: %8.2fM\tProcs: %8.2fM\n", dpss, pss); + + done: + --nreports; + if (nreports == 0) { + /* cancel the timeout */ + OBJ_DESTRUCT(&profile_timer); + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); + /* set the global abnormal exit flag */ + orte_abnormal_term_ordered = true; + } +} + +static void profile_timeout(int sd, short args, void *cbdata) +{ + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); + /* set the global abnormal exit flag */ + orte_abnormal_term_ordered = true; +} + +void orte_profile_wakeup(int sd, short args, void *cbdata) +{ + orte_job_t *jdata = (orte_job_t*)cbdata; + orte_job_t *dmns; + orte_proc_t *dmn; + int i; + int rc; + orte_daemon_cmd_flag_t command = ORTE_DAEMON_GET_MEMPROFILE; + opal_buffer_t *buffer; + orte_process_name_t name; + + /* this function gets called when the job execution time + * has hit a specified limit - collect profile data and + * abort this job, but timeout if we cannot do so + */ + + /* set the recv */ + nreports = 1; // always get a report from ourselves + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MEMPROFILE, + ORTE_RML_PERSISTENT, profile_recv, NULL); + + /* setup the buffer */ + buffer = OBJ_NEW(opal_buffer_t); + /* pack the command */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + goto giveup; + } + /* pack the jobid in question */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata->jobid, 1, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + goto giveup; + } + + /* goes to just the first daemon beyond ourselves - no need to get it from everyone */ + dmns = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + if (NULL != (dmn = (orte_proc_t*)opal_pointer_array_get_item(dmns->procs, 1))) { + ++nreports; + } + + /* send it out */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + for (i=0; i < nreports; i++) { + OBJ_RETAIN(buffer); + name.vpid = i; + if (0 > (rc = orte_rml.send_buffer_nb(&name, buffer, + ORTE_RML_TAG_DAEMON, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + } + } + OBJ_RELEASE(buffer); // maintain accounting + + /* we will terminate after we get the profile, but set a timeout + * just in case we never hear back */ + OBJ_CONSTRUCT(&profile_timer, orte_timer_t); + opal_event_evtimer_set(orte_event_base, + profile_timer.ev, profile_timeout, NULL); + opal_event_set_priority(profile_timer.ev, ORTE_ERROR_PRI); + profile_timer.tv.tv_sec = 30; + opal_event_evtimer_add(profile_timer.ev, &profile_timer.tv); + return; + + giveup: + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); +} +