Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opal/dss/dss_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ int opal_dss_copy_pstat(opal_pstats_t **dest, opal_pstats_t *src,
p->time = src->time;
p->priority = src->priority;
p->num_threads = src->num_threads;
p->pss = src->pss;
p->vsize = src->vsize;
p->rss = src->rss;
p->peak_vsize = src->peak_vsize;
Expand Down
1 change: 1 addition & 0 deletions opal/dss/dss_open_close.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ static void opal_pstat_construct(opal_pstats_t *obj)
obj->time.tv_usec = 0;
obj->priority = -1;
obj->num_threads = -1;
obj->pss = 0.0;
obj->vsize = 0.0;
obj->rss = 0.0;
obj->peak_vsize = 0.0;
Expand Down
3 changes: 3 additions & 0 deletions opal/dss/dss_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ int opal_dss_pack_pstat(opal_buffer_t *buffer, const void *src,
if (OPAL_SUCCESS != (ret = opal_dss_pack_buffer(buffer, &ptr[i]->num_threads, 1, OPAL_INT16))) {
return ret;
}
if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->pss, 1, OPAL_FLOAT))) {
return ret;
}
if (OPAL_SUCCESS != (ret = opal_dss_pack_float(buffer, &ptr[i]->vsize, 1, OPAL_FLOAT))) {
return ret;
}
Expand Down
4 changes: 2 additions & 2 deletions opal/dss/dss_print.c
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@ int opal_dss_print_pstat(char **output, char *prefix, opal_pstats_t *src, opal_d
return OPAL_SUCCESS;
}
asprintf(output, "%sOPAL_PSTATS SAMPLED AT: %ld.%06ld\n%snode: %s rank: %d pid: %d cmd: %s state: %c pri: %d #threads: %d Processor: %d\n"
"%s\ttime: %ld.%06ld cpu: %5.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n",
"%s\ttime: %ld.%06ld cpu: %5.2f PSS: %8.2f VMsize: %8.2f PeakVMSize: %8.2f RSS: %8.2f\n",
prefx, (long)src->sample_time.tv_sec, (long)src->sample_time.tv_usec,
prefx, src->node, src->rank, src->pid, src->cmd, src->state[0], src->priority, src->num_threads, src->processor,
prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->vsize, src->peak_vsize, src->rss);
prefx, (long)src->time.tv_sec, (long)src->time.tv_usec, src->percent_cpu, src->pss, src->vsize, src->peak_vsize, src->rss);
if (prefx != prefix) {
free(prefx);
}
Expand Down
1 change: 1 addition & 0 deletions opal/dss/dss_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ typedef struct {
float percent_cpu;
int32_t priority;
int16_t num_threads;
float pss; /* in MBytes */
float vsize; /* in MBytes */
float rss; /* in MBytes */
float peak_vsize; /* in MBytes */
Expand Down
5 changes: 5 additions & 0 deletions opal/dss/dss_unpack.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,11 @@ int opal_dss_unpack_pstat(opal_buffer_t *buffer, void *dest,
return ret;
}
m=1;
if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->pss, &m, OPAL_FLOAT))) {
OPAL_ERROR_LOG(ret);
return ret;
}
m=1;
if (OPAL_SUCCESS != (ret = opal_dss_unpack_float(buffer, &ptr[i]->vsize, &m, OPAL_FLOAT))) {
OPAL_ERROR_LOG(ret);
return ret;
Expand Down
25 changes: 25 additions & 0 deletions opal/mca/pstat/linux/pstat_linux_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,31 @@ static int query(pid_t pid,
}
}
fclose(fp);

/* now create the smaps filename for this proc */
memset(data, 0, sizeof(data));
numchars = snprintf(data, sizeof(data), "/proc/%d/smaps", pid);
if (numchars >= sizeof(data)) {
return OPAL_ERR_VALUE_OUT_OF_BOUNDS;
}

if (NULL == (fp = fopen(data, "r"))) {
/* ignore this */
return OPAL_SUCCESS;
}

/* parse it to find lines that start with "Pss" */
while (NULL != (dptr = local_getline(fp))) {
if (NULL == (value = local_stripper(dptr))) {
/* cannot process */
continue;
}
/* look for Pss */
if (0 == strncmp(dptr, "Pss", strlen("Pss"))) {
stats->pss += convert_value(value);
}
}
fclose(fp);
}

if (NULL != nstats) {
Expand Down
3 changes: 3 additions & 0 deletions orte/mca/odls/odls_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ typedef uint8_t orte_daemon_cmd_flag_t;
/* for debug purposes, get stack traces from all application procs */
#define ORTE_DAEMON_GET_STACK_TRACES (orte_daemon_cmd_flag_t) 31

/* for memory profiling */
#define ORTE_DAEMON_GET_MEMPROFILE (orte_daemon_cmd_flag_t) 32

/*
* Struct written up the pipe from the child to the parent.
*/
Expand Down
3 changes: 3 additions & 0 deletions orte/mca/rml/rml_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ BEGIN_C_DECLS
/* stacktrace for debug */
#define ORTE_RML_TAG_STACK_TRACE 60

/* memory profile */
#define ORTE_RML_TAG_MEMPROFILE 61

#define ORTE_RML_TAG_MAX 100

#define ORTE_RML_TAG_NTOH(t) ntohl(t)
Expand Down
44 changes: 44 additions & 0 deletions orte/orted/orted_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include "opal/mca/event/event.h"
#include "opal/mca/base/base.h"
#include "opal/mca/pstat/pstat.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "opal/util/path.h"
Expand Down Expand Up @@ -115,6 +116,8 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
FILE *fp;
char gscmd[256], path[1035], *pathptr;
char string[256], *string_ptr = string;
float pss;
opal_pstats_t pstat;

/* unpack the command */
n = 1;
Expand Down Expand Up @@ -1151,6 +1154,44 @@ void orte_daemon_recv(int status, orte_process_name_t* sender,
}
break;

case ORTE_DAEMON_GET_MEMPROFILE:
answer = OBJ_NEW(opal_buffer_t);
/* pack our hostname so they know where it came from */
opal_dss.pack(answer, &orte_process_info.nodename, 1, OPAL_STRING);
/* collect my memory usage */
OBJ_CONSTRUCT(&pstat, opal_pstats_t);
opal_pstat.query(orte_process_info.pid, &pstat, NULL);
opal_dss.pack(answer, &pstat.pss, 1, OPAL_FLOAT);
OBJ_DESTRUCT(&pstat);
/* collect the memory usage of all my children */
pss = 0.0;
num_replies = 0;
for (i=0; i < orte_local_children->size; i++) {
if (NULL != (proct = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i)) &&
ORTE_FLAG_TEST(proct, ORTE_PROC_FLAG_ALIVE)) {
/* collect the stats on this proc */
OBJ_CONSTRUCT(&pstat, opal_pstats_t);
if (OPAL_SUCCESS == opal_pstat.query(proct->pid, &pstat, NULL)) {
pss += pstat.pss;
++num_replies;
}
OBJ_DESTRUCT(&pstat);
}
}
/* compute the average value */
if (0 < num_replies) {
pss /= (float)num_replies;
}
opal_dss.pack(answer, &pss, 1, OPAL_FLOAT);
/* send it back */
if (0 > (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, answer,
ORTE_RML_TAG_MEMPROFILE,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(answer);
}
break;

default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
}
Expand Down Expand Up @@ -1222,6 +1263,9 @@ static char *get_orted_comm_cmd_str(int command)
case ORTE_DAEMON_GET_STACK_TRACES:
return strdup("ORTE_DAEMON_GET_STACK_TRACES");

case ORTE_DAEMON_GET_MEMPROFILE:
return strdup("ORTE_DAEMON_GET_MEMPROFILE");

default:
return strdup("Unknown Command!");
}
Expand Down
142 changes: 142 additions & 0 deletions orte/orted/orted_submit.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ static orte_std_cntr_t total_num_apps = 0;
static bool want_prefix_by_default = (bool) ORTE_WANT_ORTERUN_PREFIX_BY_DEFAULT;
static opal_pointer_array_t tool_jobs;
static int timeout_seconds;
static orte_timer_t *orte_memprofile_timeout;

int orte_debugger_attach_fd = -1;
bool orte_debugger_fifo_active=false;
Expand All @@ -135,6 +136,7 @@ static int parse_locals(orte_job_t *jdata, int argc, char* argv[]);
static void set_classpath_jar_file(orte_app_context_t *app, int index, char *jarfile);
static int parse_appfile(orte_job_t *jdata, char *filename, char ***env);
static void orte_timeout_wakeup(int sd, short args, void *cbdata);
static void orte_profile_wakeup(int sd, short args, void *cbdata);
static void launch_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag, void *cbdata);
Expand Down Expand Up @@ -902,6 +904,21 @@ int orte_submit_job(char *argv[], int *index,
opal_event_evtimer_add(orte_mpiexec_timeout->ev, &orte_mpiexec_timeout->tv);
}

/* check for diagnostic memory profile */
if (NULL != (param = getenv("OMPI_MEMPROFILE"))) {
timeout_seconds = strtol(param, NULL, 10);
if (NULL == (orte_memprofile_timeout = OBJ_NEW(orte_timer_t))) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
ORTE_UPDATE_EXIT_STATUS(ORTE_ERR_OUT_OF_RESOURCE);
//goto DONE;
}
orte_memprofile_timeout->tv.tv_sec = timeout_seconds;
orte_memprofile_timeout->tv.tv_usec = 0;
opal_event_evtimer_set(orte_event_base, orte_memprofile_timeout->ev,
orte_profile_wakeup, jdata);
opal_event_set_priority(orte_memprofile_timeout->ev, ORTE_ERROR_PRI);
opal_event_evtimer_add(orte_memprofile_timeout->ev, &orte_memprofile_timeout->tv);
}
if (ORTE_PROC_IS_HNP) {
/* get the daemon job object */
daemons = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
Expand Down Expand Up @@ -3055,3 +3072,128 @@ void orte_timeout_wakeup(int sd, short args, void *cbdata)
/* set the global abnormal exit flag */
orte_abnormal_term_ordered = true;
}

static int nreports = 0;
static orte_timer_t profile_timer;

static void profile_recv(int status, orte_process_name_t* sender,
opal_buffer_t *buffer, orte_rml_tag_t tag,
void* cbdata)
{
int32_t cnt;
char *hostname;
float dpss, pss;

/* unpack the hostname where this came from */
cnt = 1;
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING)) {
goto done;
}
/* print the hostname */
fprintf(stderr, "Memory profile from host: %s\n", hostname);
free(hostname);

/* get the PSS of the daemon */
cnt = 1;
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &dpss, &cnt, OPAL_FLOAT)) {
goto done;
}
/* get the average PSS of the child procs */
cnt = 1;
if (OPAL_SUCCESS != opal_dss.unpack(buffer, &pss, &cnt, OPAL_FLOAT)) {
goto done;
}

fprintf(stderr, "\tDaemon: %8.2fM\tProcs: %8.2fM\n", dpss, pss);

done:
--nreports;
if (nreports == 0) {
/* cancel the timeout */
OBJ_DESTRUCT(&profile_timer);
/* abort the job */
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
/* set the global abnormal exit flag */
orte_abnormal_term_ordered = true;
}
}

static void profile_timeout(int sd, short args, void *cbdata)
{
/* abort the job */
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
/* set the global abnormal exit flag */
orte_abnormal_term_ordered = true;
}

void orte_profile_wakeup(int sd, short args, void *cbdata)
{
orte_job_t *jdata = (orte_job_t*)cbdata;
orte_job_t *dmns;
orte_proc_t *dmn;
int i;
int rc;
orte_daemon_cmd_flag_t command = ORTE_DAEMON_GET_MEMPROFILE;
opal_buffer_t *buffer;
orte_process_name_t name;

/* this function gets called when the job execution time
* has hit a specified limit - collect profile data and
* abort this job, but timeout if we cannot do so
*/

/* set the recv */
nreports = 1; // always get a report from ourselves
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_MEMPROFILE,
ORTE_RML_PERSISTENT, profile_recv, NULL);

/* setup the buffer */
buffer = OBJ_NEW(opal_buffer_t);
/* pack the command */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &command, 1, ORTE_DAEMON_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
goto giveup;
}
/* pack the jobid in question */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buffer, &jdata->jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
goto giveup;
}

/* goes to just the first daemon beyond ourselves - no need to get it from everyone */
dmns = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid);
if (NULL != (dmn = (orte_proc_t*)opal_pointer_array_get_item(dmns->procs, 1))) {
++nreports;
}

/* send it out */
name.jobid = ORTE_PROC_MY_NAME->jobid;
for (i=0; i < nreports; i++) {
OBJ_RETAIN(buffer);
name.vpid = i;
if (0 > (rc = orte_rml.send_buffer_nb(&name, buffer,
ORTE_RML_TAG_DAEMON,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
}
}
OBJ_RELEASE(buffer); // maintain accounting

/* we will terminate after we get the profile, but set a timeout
* just in case we never hear back */
OBJ_CONSTRUCT(&profile_timer, orte_timer_t);
opal_event_evtimer_set(orte_event_base,
profile_timer.ev, profile_timeout, NULL);
opal_event_set_priority(profile_timer.ev, ORTE_ERROR_PRI);
profile_timer.tv.tv_sec = 30;
opal_event_evtimer_add(profile_timer.ev, &profile_timer.tv);
return;

giveup:
/* abort the job */
ORTE_ACTIVATE_JOB_STATE(NULL, ORTE_JOB_STATE_ALL_JOBS_COMPLETE);
}