From c1050bc01ed411a53d88f7afdd599cdb372bb6f1 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 31 Aug 2016 09:32:07 -0700 Subject: [PATCH] Provide a mechanism for obtaining memory profiles of daemons and application profiles for use in studying our memory footprint. Setting OMPI_MEMPROFILE=N causes mpirun to set a timer for N seconds. When the timer fires, mpirun will query each daemon in the job to report its own memory usage plus the average memory usage of its child processes. The Proportional Set Size (PSS) is used for this purpose. --- opal/dss/dss_copy.c | 1 + opal/dss/dss_open_close.c | 1 + opal/dss/dss_pack.c | 3 + opal/dss/dss_print.c | 4 +- opal/dss/dss_types.h | 1 + opal/dss/dss_unpack.c | 5 + opal/mca/pstat/linux/pstat_linux_module.c | 25 ++++ orte/mca/odls/odls_types.h | 3 + orte/mca/rml/rml_types.h | 3 + orte/orted/orted_comm.c | 44 +++++++ orte/orted/orted_submit.c | 142 ++++++++++++++++++++++ 11 files changed, 230 insertions(+), 2 deletions(-) diff --git a/opal/dss/dss_copy.c b/opal/dss/dss_copy.c index 839ddc648b9..a39798bd46a 100644 --- a/opal/dss/dss_copy.c +++ b/opal/dss/dss_copy.c @@ -219,6 +219,7 @@ int opal_dss_copy_pstat(opal_pstats_t **dest, opal_pstats_t *src, p->time = src->time; p->priority = src->priority; p->num_threads = src->num_threads; + p->pss = src->pss; p->vsize = src->vsize; p->rss = src->rss; p->peak_vsize = src->peak_vsize; diff --git a/opal/dss/dss_open_close.c b/opal/dss/dss_open_close.c index 366cf2586a9..baf58143efe 100644 --- a/opal/dss/dss_open_close.c +++ b/opal/dss/dss_open_close.c @@ -156,6 +156,7 @@ static void opal_pstat_construct(opal_pstats_t *obj) obj->time.tv_usec = 0; obj->priority = -1; obj->num_threads = -1; + obj->pss = 0.0; obj->vsize = 0.0; obj->rss = 0.0; obj->peak_vsize = 0.0; diff --git a/opal/dss/dss_pack.c b/opal/dss/dss_pack.c index cd7b035bb18..23c9d3b31bc 100644 --- a/opal/dss/dss_pack.c +++ b/opal/dss/dss_pack.c @@ -499,6 +499,9 @@ int opal_dss_pack_pstat(opal_buffer_t *buffer, const void *src, if (OPAL_SUCCESS != (ret = opal_dss_pack_buffer(buffer, &ptr[i]->num_threads, 1, OPAL_INT16))) { return ret; } + if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->pss, 1, OPAL_FLOAT))) { + return ret; + } if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->vsize, 1, OPAL_FLOAT))) { return ret; } diff --git a/opal/dss/dss_print.c b/opal/dss/dss_print.c index e95a40f8141..98deb25fe70 100644 --- a/opal/dss/dss_print.c +++ b/opal/dss/dss_print.c @@ -654,10 +654,10 @@ int opal_dss_print_pstat(char **output, char *prefix, opal_pstats_t *src, opal_d return OPAL_SUCCESS; } asprintf(output, "%sOPAL_PSTATS SAMPLED AT: %ld.%06ld\n%snode: %s rank: %d pid: %d cmd: %s state: %c pri: %d #threads: %d Processor: %d\n" - "%s\ttime: %ld.%06ld cpu: %5.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n", + "%s\ttime: %ld.%06ld cpu: %5.2f PSS: %8.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n", prefx, (long)src->sample_time.tv_sec, (long)src->sample_time.tv_usec, prefx, src->node, src->rank, src->pid, src->cmd, src->state[0], src->priority, src->num_threads, src->processor, - prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->vsize, src->peak_vsize, src->rss); + prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->pss, src->vsize, src->peak_vsize, src->rss); if (prefx != prefix) { free(prefx); } diff --git a/opal/dss/dss_types.h b/opal/dss/dss_types.h index d3096964e25..23d2f08dcae 100644 --- a/opal/dss/dss_types.h +++ b/opal/dss/dss_types.h @@ -182,6 +182,7 @@ typedef struct { float percent_cpu; int32_t priority; int16_t num_threads; + float pss; /* in MBytes */ float vsize; /* in MBytes */ float rss; /* in MBytes */ float peak_vsize; /* in MBytes */ diff --git a/opal/dss/dss_unpack.c b/opal/dss/dss_unpack.c index 9e86ec98b88..be9993983cd 100644 --- a/opal/dss/dss_unpack.c +++ b/opal/dss/dss_unpack.c @@ -643,6 +643,11 @@ int opal_dss_unpack_pstat(opal_buffer_t *buffer, void *dest, return ret; } m=1; + if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->pss, &m, OPAL_FLOAT))) { + OPAL_ERROR_LOG(ret); + return ret; + } + m=1; if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->vsize, &m, OPAL_FLOAT))) { OPAL_ERROR_LOG(ret); return ret; diff --git a/opal/mca/pstat/linux/pstat_linux_module.c b/opal/mca/pstat/linux/pstat_linux_module.c index 0ae3da6a33e..41aa9b0f5d1 100644 --- a/opal/mca/pstat/linux/pstat_linux_module.c +++ b/opal/mca/pstat/linux/pstat_linux_module.c @@ -310,6 +310,31 @@ static int query(pid_t pid, } } fclose(fp); + + /* now create the smaps filename for this proc */ + memset(data, 0, sizeof(data)); + numchars = snprintf(data, sizeof(data), "/proc/%d/smaps", pid); + if (numchars >= sizeof(data)) { + return OPAL_ERR_VALUE_OUT_OF_BOUNDS; + } + + if (NULL == (fp = fopen(data, "r"))) { + /* ignore this */ + return OPAL_SUCCESS; + } + + /* parse it to find lines that start with "Pss" */ + while (NULL != (dptr = local_getline(fp))) { + if (NULL == (value = local_stripper(dptr))) { + /* cannot process */ + continue; + } + /* look for Pss */ + if (0 == strncmp(dptr, "Pss", strlen("Pss"))) { + stats->pss += convert_value(value); + } + } + fclose(fp); } if (NULL != nstats) { diff --git a/orte/mca/odls/odls_types.h b/orte/mca/odls/odls_types.h index 51c9afc3634..436fc31f75a 100644 --- a/orte/mca/odls/odls_types.h +++ b/orte/mca/odls/odls_types.h @@ -83,6 +83,9 @@ typedef uint8_t orte_daemon_cmd_flag_t; /* for debug purposes, get stack traces from all application procs */ #define ORTE_DAEMON_GET_STACK_TRACES (orte_daemon_cmd_flag_t) 31 +/* for memory profiling */ +#define ORTE_DAEMON_GET_MEMPROFILE (orte_daemon_cmd_flag_t) 32 + /* * Struct written up the pipe from the child to the parent. */ diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index a8714ad24a3..c021db97782 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -166,6 +166,9 @@ BEGIN_C_DECLS /* stacktrace for debug */ #define ORTE_RML_TAG_STACK_TRACE 60 +/* memory profile */ +#define ORTE_RML_TAG_MEMPROFILE 61 + #define ORTE_RML_TAG_MAX 100 #define ORTE_RML_TAG_NTOH(t) ntohl(t) diff --git a/orte/orted/orted_comm.c b/orte/orted/orted_comm.c index 64146d5cde1..35e75e56af7 100644 --- a/orte/orted/orted_comm.c +++ b/orte/orted/orted_comm.c @@ -45,6 +45,7 @@ #include "opal/mca/event/event.h" #include "opal/mca/base/base.h" +#include "opal/mca/pstat/pstat.h" #include "opal/util/output.h" #include "opal/util/opal_environ.h" #include "opal/util/path.h" @@ -115,6 +116,8 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, FILE *fp; char gscmd[256], path[1035], *pathptr; char string[256], *string_ptr = string; + float pss; + opal_pstats_t pstat; /* unpack the command */ n = 1; @@ -1151,6 +1154,44 @@ void orte_daemon_recv(int status, orte_process_name_t* sender, } break; + case ORTE_DAEMON_GET_MEMPROFILE: + answer = OBJ_NEW(opal_buffer_t); + /* pack our hostname so they know where it came from */ + opal_dss.pack(answer, &orte_process_info.nodename, 1, OPAL_STRING); + /* collect my memory usage */ + OBJ_CONSTRUCT(&pstat, opal_pstats_t); + opal_pstat.query(orte_process_info.pid, &pstat, NULL); + opal_dss.pack(answer, &pstat.pss, 1, OPAL_FLOAT); + OBJ_DESTRUCT(&pstat); + /* collect the memory usage of all my children */ + pss = 0.0; + num_replies = 0; + for (i=0; i < orte_local_children->size; i++) { + if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) && + ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) { + /* collect the stats on this proc */ + OBJ_CONSTRUCT(&pstat, opal_pstats_t); + if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) { + pss += pstat.pss; + ++num_replies; + } + OBJ_DESTRUCT(&pstat); + } + } + /* compute the average value */ + if (0 < num_replies) { + pss /= (float)num_replies; + } + opal_dss.pack(answer, &pss, 1, OPAL_FLOAT); + /* send it back */ + if (0 > (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, answer, + ORTE_RML_TAG_MEMPROFILE, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(ret); + OBJ_RELEASE(answer); + } + break; + default: ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); } @@ -1222,6 +1263,9 @@ static char *get_orted_comm_cmd_str(int command) case ORTE_DAEMON_GET_STACK_TRACES: return strdup("ORTE_DAEMON_GET_STACK_TRACES"); + case ORTE_DAEMON_GET_MEMPROFILE: + return strdup("ORTE_DAEMON_GET_MEMPROFILE"); + default: return strdup("Unknown Command!"); } diff --git a/orte/orted/orted_submit.c b/orte/orted/orted_submit.c index ec5ad04b6ff..f7bf6281d4b 100644 --- a/orte/orted/orted_submit.c +++ b/orte/orted/orted_submit.c @@ -117,6 +117,7 @@ static orte_std_cntr_t total_num_apps = 0; static bool want_prefix_by_default = (bool) ORTE_WANT_ORTERUN_PREFIX_BY_DEFAULT; static opal_pointer_array_t tool_jobs; static int timeout_seconds; +static orte_timer_t *orte_memprofile_timeout; int orte_debugger_attach_fd = -1; bool orte_debugger_fifo_active=false; @@ -135,6 +136,7 @@ static int parse_locals(orte_job_t *jdata, int argc, char* argv[]); static void set_classpath_jar_file(orte_app_context_t *app, int index, char *jarfile); static int parse_appfile(orte_job_t *jdata, char *filename, char ***env); static void orte_timeout_wakeup(int sd, short args, void *cbdata); +static void orte_profile_wakeup(int sd, short args, void *cbdata); static void launch_recv(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata); @@ -902,6 +904,21 @@ int orte_submit_job(char *argv[], int *index, opal_event_evtimer_add(orte_mpiexec_timeout->ev, &orte_mpiexec_timeout->tv); } + /* check for diagnostic memory profile */ + if (NULL != (param = getenv("OMPI_MEMPROFILE"))) { + timeout_seconds = strtol(param, NULL, 10); + if (NULL == (orte_memprofile_timeout = OBJ_NEW(orte_timer_t))) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + ORTE_UPDATE_EXIT_STATUS(ORTE_ERR_OUT_OF_RESOURCE); + //goto DONE; + } + orte_memprofile_timeout->tv.tv_sec = timeout_seconds; + orte_memprofile_timeout->tv.tv_usec = 0; + opal_event_evtimer_set(orte_event_base, orte_memprofile_timeout->ev, + orte_profile_wakeup, jdata); + opal_event_set_priority(orte_memprofile_timeout->ev, ORTE_ERROR_PRI); + opal_event_evtimer_add(orte_memprofile_timeout->ev, &orte_memprofile_timeout->tv); + } if (ORTE_PROC_IS_HNP) { /* get the daemon job object */ daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); @@ -3055,3 +3072,128 @@ void orte_timeout_wakeup(int sd, short args, void *cbdata) /* set the global abnormal exit flag */ orte_abnormal_term_ordered = true; } + +static int nreports = 0; +static orte_timer_t profile_timer; + +static void profile_recv(int status, orte_process_name_t* sender, + opal_buffer_t *buffer, orte_rml_tag_t tag, + void* cbdata) +{ + int32_t cnt; + char *hostname; + float dpss, pss; + + /* unpack the hostname where this came from */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING)) { + goto done; + } + /* print the hostname */ + fprintf(stderr, "Memory profile from host: %s\n", hostname); + free(hostname); + + /* get the PSS of the daemon */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &dpss, &cnt, OPAL_FLOAT)) { + goto done; + } + /* get the average PSS of the child procs */ + cnt = 1; + if (OPAL_SUCCESS != opal_dss.unpack(buffer, &pss, &cnt, OPAL_FLOAT)) { + goto done; + } + + fprintf(stderr, "\tDaemon: %8.2fM\tProcs: %8.2fM\n", dpss, pss); + + done: + --nreports; + if (nreports == 0) { + /* cancel the timeout */ + OBJ_DESTRUCT(&profile_timer); + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); + /* set the global abnormal exit flag */ + orte_abnormal_term_ordered = true; + } +} + +static void profile_timeout(int sd, short args, void *cbdata) +{ + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); + /* set the global abnormal exit flag */ + orte_abnormal_term_ordered = true; +} + +void orte_profile_wakeup(int sd, short args, void *cbdata) +{ + orte_job_t *jdata = (orte_job_t*)cbdata; + orte_job_t *dmns; + orte_proc_t *dmn; + int i; + int rc; + orte_daemon_cmd_flag_t command = ORTE_DAEMON_GET_MEMPROFILE; + opal_buffer_t *buffer; + orte_process_name_t name; + + /* this function gets called when the job execution time + * has hit a specified limit - collect profile data and + * abort this job, but timeout if we cannot do so + */ + + /* set the recv */ + nreports = 1; // always get a report from ourselves + orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MEMPROFILE, + ORTE_RML_PERSISTENT, profile_recv, NULL); + + /* setup the buffer */ + buffer = OBJ_NEW(opal_buffer_t); + /* pack the command */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &command, 1, ORTE_DAEMON_CMD))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + goto giveup; + } + /* pack the jobid in question */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata->jobid, 1, ORTE_JOBID))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + goto giveup; + } + + /* goes to just the first daemon beyond ourselves - no need to get it from everyone */ + dmns = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); + if (NULL != (dmn = (orte_proc_t*)opal_pointer_array_get_item(dmns->procs, 1))) { + ++nreports; + } + + /* send it out */ + name.jobid = ORTE_PROC_MY_NAME->jobid; + for (i=0; i < nreports; i++) { + OBJ_RETAIN(buffer); + name.vpid = i; + if (0 > (rc = orte_rml.send_buffer_nb(&name, buffer, + ORTE_RML_TAG_DAEMON, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(buffer); + } + } + OBJ_RELEASE(buffer); // maintain accounting + + /* we will terminate after we get the profile, but set a timeout + * just in case we never hear back */ + OBJ_CONSTRUCT(&profile_timer, orte_timer_t); + opal_event_evtimer_set(orte_event_base, + profile_timer.ev, profile_timeout, NULL); + opal_event_set_priority(profile_timer.ev, ORTE_ERROR_PRI); + profile_timer.tv.tv_sec = 30; + opal_event_evtimer_add(profile_timer.ev, &profile_timer.tv); + return; + + giveup: + /* abort the job */ + ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE); +} +