Skip to content

Commit

Permalink
Allow mom restart during multinode job
Browse files Browse the repository at this point in the history
  • Loading branch information
zv0n committed Aug 11, 2020
1 parent 277fe0e commit d48f11b
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 14 deletions.
6 changes: 6 additions & 0 deletions src/include/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ struct job {
int ji_stderr; /* socket for stderr */
int ji_ports[2]; /* ports for stdout/err */
enum bg_hook_request ji_hook_running_bg_on; /* set when hook starts in the background*/
int ji_msconnected; /* 0 - not connected, 1 - connected */
pbs_list_head ji_multinodejobs; /* links to recovered multinode jobs */
#else /* END Mom ONLY - start Server ONLY */
int ji_discarding; /* discarding job */
struct batch_request *ji_prunreq; /* outstanding runjob request */
Expand Down Expand Up @@ -612,6 +614,8 @@ struct job {
#ifdef PBS_MOM
tm_host_id ji_nodeidx; /* my node id */
tm_task_id ji_taskidx; /* generate task id's for job */
int ji_stdout;
int ji_stderr;
#if MOM_ALPS
long ji_reservation;
/* ALPS reservation identifier */
Expand Down Expand Up @@ -765,6 +769,8 @@ typedef struct infoent {
#define IM_EXEC_PROLOGUE 24
#define IM_CRED 25
#define IM_PMIX 26
#define IM_RECONNECT_TO_MS 27
#define IM_JOIN_RECOV_JOB 28

#define IM_ERROR 99
#define IM_ERROR2 100
Expand Down
2 changes: 1 addition & 1 deletion src/include/mom_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ extern pid_t fork_me(int sock);
extern ssize_t readpipe(int pfd, void *vptr, size_t nbytes);
extern ssize_t writepipe(int pfd, void *vptr, size_t nbytes);
extern int get_la(double *);
extern void init_abort_jobs(int);
extern void init_abort_jobs(int, pbs_list_head*);
extern void checkret(char **spot, int len);
extern void mom_nice(void);
extern void mom_unnice(void);
Expand Down
22 changes: 19 additions & 3 deletions src/resmom/catch_child.c
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,9 @@ scan_for_exiting(void)
for (pjob = (job *)GET_NEXT(svr_alljobs); pjob; pjob = nxjob) {
nxjob = (job *)GET_NEXT(pjob->ji_alljobs);

if (pjob->ji_numnodes > 1 && !pjob->ji_msconnected && pjob->ji_nodeid) /* assume that MS has a connection to itself at all times */
continue;

/*
** If a restart is active, skip this job since
** not all of the tasks may have started yet.
Expand Down Expand Up @@ -1876,11 +1879,12 @@ send_hellosvr(int stream)
* terminated and requeued.
*
* @param [in] recover - Specify recovering mode for MoM.
* @param [in] multinode_jobs - Pointer to list of pointers to recovered multinode jobs
*
*/

void
init_abort_jobs(int recover)
init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
{
DIR *dir;
int i, sisters;
Expand All @@ -1896,6 +1900,8 @@ init_abort_jobs(int recover)
extern char *path_checkpoint;
extern char *path_spool;

CLEAR_HEAD((*multinode_jobs));

dir = opendir(path_jobs);
if (dir == NULL) {
log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
Expand Down Expand Up @@ -1959,8 +1965,10 @@ init_abort_jobs(int recover)
*/
if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
/* I am sister, junk the job files */
mom_deljob(pj);
continue;
if( recover != 2 ) {
mom_deljob(pj);
continue;
}
}

sisters = pj->ji_numnodes - 1;
Expand Down Expand Up @@ -2060,6 +2068,14 @@ init_abort_jobs(int recover)

if (mom_do_poll(pj))
append_link(&mom_polljobs, &pj->ji_jobque, pj);

append_link(multinode_jobs, &pj->ji_multinodejobs, pj);

if (pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
pj->ji_stdout = pj->ji_ports[0] = pj->ji_extended.ji_ext.ji_stdout;
pj->ji_stderr = pj->ji_ports[1] = pj->ji_extended.ji_ext.ji_stdout;
}
}
}
if (errno != 0 && errno != ENOENT) {
Expand Down
115 changes: 112 additions & 3 deletions src/resmom/mom_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>

#include <unistd.h>
#include <dirent.h>
Expand Down Expand Up @@ -116,6 +117,7 @@ extern char *msg_err_malloc;
extern int
write_pipe_data(int upfds, void *data, int data_size);
char task_fmt[] = "/%8.8X";
extern void resume_multinode(job *pjob);


/* Function pointers
Expand Down Expand Up @@ -217,6 +219,41 @@ pbs_jobnodevoid_t job_free_node = NULL;

eventent * event_dup(eventent *ep, job *pjob, hnodent *pnode);

/**
* @brief
* Write to fd and check if enough was written
*
* @param[in] fd - file descriptor
* @param[in] write_ptr - pointer to data to be written
* @param[in] size - size of data which should be written
*
* @return Error code
* @retval 0 Success
* @retval -1 Failure
*
*/
int
ensure_write(int fd, char *write_ptr, int size)
{
int i = 0;
while ((i = write(fd, write_ptr, size)) != size) {
if ((i < 0) && (errno == EINTR)) { /* retry the write */
if (lseek(fd, (off_t)0, SEEK_SET) < 0) {
log_err(errno, __func__, "lseek");
(void)close(fd);
return (-1);
}
continue;
}
else {
log_err(errno, __func__, "quickwrite");
(void)close(fd);
return (-1);
}
}
return 0;
}

/**
* @brief
* Save the critical information associated with a task to disk.
Expand Down Expand Up @@ -278,8 +315,26 @@ task_save(pbs_task *ptask)
return (-1);
}
}
obitent *pobit = (obitent *)GET_NEXT(ptask->ti_obits);
if(!pobit)
goto closefile;
if( ensure_write(fds, (char *)&pobit->oe_type, sizeof(pobit->oe_type)) != 0 )
goto failedwrite;
if( ensure_write(fds, (char *)&pobit->oe_u.oe_tm.oe_fd, sizeof(pobit->oe_u.oe_tm.oe_fd)) != 0 )
goto failedwrite;
if( ensure_write(fds, (char *)&pobit->oe_u.oe_tm.oe_node, sizeof(pobit->oe_u.oe_tm.oe_node)) != 0 )
goto failedwrite;
if( ensure_write(fds, (char *)&pobit->oe_u.oe_tm.oe_event, sizeof(pobit->oe_u.oe_tm.oe_event)) != 0 )
goto failedwrite;
if( ensure_write(fds, (char *)&pobit->oe_u.oe_tm.oe_taskid, sizeof(pobit->oe_u.oe_tm.oe_taskid)) != 0 )
goto failedwrite;
closefile:
(void)close(fds);
return (0);
failedwrite:
log_err(errno, __func__, "quickwrite");
(void)close(fds);
return (-1);
}

/**
Expand Down Expand Up @@ -681,6 +736,32 @@ task_recov(job *pjob)
continue;
}
pt->ti_qs = task_save;

obitent *op = (obitent *)malloc(sizeof(obitent));
assert(op);
CLEAR_LINK(op->oe_next);
if(read(fds, (char*)&op->oe_type, sizeof(op->oe_type)) != sizeof(op->oe_type)) {
free(op);
goto close;
}
if(read(fds, (char*)&op->oe_u.oe_tm.oe_fd, sizeof(op->oe_u.oe_tm.oe_fd)) != sizeof(op->oe_u.oe_tm.oe_fd)) {
free(op);
goto close;
}
if(read(fds, (char*)&op->oe_u.oe_tm.oe_node, sizeof(op->oe_u.oe_tm.oe_node)) != sizeof(op->oe_u.oe_tm.oe_node)) {
free(op);
goto close;
}
if(read(fds, (char*)&op->oe_u.oe_tm.oe_event, sizeof(op->oe_u.oe_tm.oe_event)) != sizeof(op->oe_u.oe_tm.oe_event)) {
free(op);
goto close;
}
if(read(fds, (char*)&op->oe_u.oe_tm.oe_taskid, sizeof(op->oe_u.oe_tm.oe_taskid)) != sizeof(op->oe_u.oe_tm.oe_taskid)) {
free(op);
goto close;
}
append_link(&pt->ti_obits, &op->oe_next, op);
close:
(void)close(fds);
}
if (errno != 0 && errno != ENOENT) {
Expand Down Expand Up @@ -2331,7 +2412,7 @@ term_job(job *pjob)
int num;

for (num=0, np = pjob->ji_hosts;
num<pjob->ji_numnodes;
num < pjob->ji_numnodes;
num++, np++) {
if (np->hn_stream >= 0) {
np->hn_stream = -1;
Expand Down Expand Up @@ -2396,6 +2477,7 @@ im_eof(int stream, int ret)
np->hn_stream = -1;
if (np->hn_eof_ts == 0)
np->hn_eof_ts = time(0);
pjob->ji_msconnected = 0;

/*
** In case connection to pbs_comm is down/recently established, do not kill a job that is actually running.
Expand Down Expand Up @@ -2509,6 +2591,7 @@ check_ms(int stream, job *pjob)
np->hn_stream = stream;
}
np->hn_eof_ts = 0;
pjob->ji_msconnected = 1;
return FALSE;
}

Expand Down Expand Up @@ -3036,6 +3119,25 @@ im_request(int stream, int version)
BAIL("fromtask")
switch (command) {

case IM_JOIN_RECOV_JOB:
reply = 1;

hnodenum = disrsi(stream, &ret);
BAIL("JOINJOB nodenum")

np = NULL;
/* job should already exist */
pjob = find_job(jobid);
if( pjob == NULL ) {
SEND_ERR(PBSE_SYSTEM)
goto done;
}
pjob->ji_stdout = disrsi(stream, &ret);
BAIL("JOINJOB stdout")
pjob->ji_stderr = disrsi(stream, &ret);
BAIL("JOINJOB stderr")
pjob->ji_msconnected = 1;
goto done;
case IM_JOIN_JOB:
/*
** Sender is mom superior sending a job structure to me.
Expand All @@ -3052,8 +3154,6 @@ im_request(int stream, int version)
** )
*/
reply = 1;
if (check_ms(stream, NULL))
goto fini;

hnodenum = disrsi(stream, &ret);
BAIL("JOINJOB nodenum")
Expand All @@ -3080,6 +3180,7 @@ im_request(int stream, int version)
info = disrcs(stream, &len, &ret);
BAIL("JOINJOB credential")
}
pjob->ji_msconnected = 1;

pjob->ji_numnodes = hnodenum;
CLEAR_HEAD(lhead);
Expand Down Expand Up @@ -3186,6 +3287,9 @@ im_request(int stream, int version)

/* np is set from job_nodes_inner */

if (check_ms(stream, pjob))
goto fini;


/*
* NULL value passed to hook_input.vnl
Expand Down Expand Up @@ -4051,6 +4155,7 @@ im_request(int stream, int version)
op->oe_u.oe_tm.oe_node = pvnodeid;
op->oe_u.oe_tm.oe_event = event;
op->oe_u.oe_tm.oe_taskid = fromtask;
task_save(ptask);
reply = 0;
}
break;
Expand Down Expand Up @@ -5424,6 +5529,10 @@ im_request(int stream, int version)
if (ret != DIS_SUCCESS)
goto err;
break;
case IM_RECONNECT_TO_MS:
if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE)
resume_multinode(pjob);
break;

default:
sprintf(log_buffer, "unknown command %d sent", command);
Expand Down
28 changes: 23 additions & 5 deletions src/resmom/mom_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ char *mom_domain;
#endif /* WIN32 */

extern void mom_vnlp_report(vnl_t *vnl, char *header);
extern void resume_multinode(job *pjob);

int alien_attach = 0; /* attach alien procs */
int alien_kill = 0; /* kill alien procs */
Expand Down Expand Up @@ -650,7 +651,7 @@ extern void dep_cleanup(void);
/* External Functions */

extern void catch_child(int);
extern void init_abort_jobs(int);
extern void init_abort_jobs(int, pbs_list_head*);
extern void scan_for_exiting(void);
#ifdef NAS /* localmod 015 */
extern int to_size(char *, struct size_value *);
Expand Down Expand Up @@ -7276,6 +7277,7 @@ mom_over_limit(job *pjob)
* check attr value limits of job
*
* @param[in] pjob - pointer to job
* @param[in] recover - recovering mode for MoM
*
* @return int
* @retval 0 Failure
Expand All @@ -7284,7 +7286,7 @@ mom_over_limit(job *pjob)
*/

int
job_over_limit(job *pjob)
job_over_limit(job *pjob, int recover)
{
attribute *attr;
attribute *used;
Expand Down Expand Up @@ -7317,7 +7319,7 @@ job_over_limit(job *pjob)

/* special case EOF */
if (pnode->hn_sister == SISTER_EOF) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob))) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob)) || recover == 2) {
snprintf(log_buffer, sizeof(log_buffer), "ignoring node EOF %d from failed mom %s as job is tolerant of node failures", pjob->ji_nodekill, pnode->hn_host?pnode->hn_host:"");
log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
return 0;
Expand Down Expand Up @@ -9552,8 +9554,10 @@ main(int argc, char *argv[])
log_err(c, msg_daemonname, "unable to recover vnode to host mapping");
}

pbs_list_link multinode_jobs;

/* recover & abort Jobs which were under MOM's control */
init_abort_jobs(recover);
init_abort_jobs(recover, &multinode_jobs);

/* deploy periodic hooks */
mom_hook_input_init(&hook_input);
Expand Down Expand Up @@ -9607,6 +9611,20 @@ main(int argc, char *argv[])
if (time_now > time_next_hello) {
send_hellosvr(server_stream);
time_next_hello = time_now + time_delta_hellosvr(MOM_DELTA_NORMAL);
if (server_stream != -1) {
job *m_job;
for (m_job = (job *)GET_NEXT(multinode_jobs); m_job;
m_job = (job *)GET_NEXT(m_job->ji_multinodejobs)) {
if (m_job->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
resume_multinode(m_job);
} else {
/* I am sister */
send_sisters(m_job, IM_RECONNECT_TO_MS, NULL);
}
}
CLEAR_HEAD(multinode_jobs);
}
}
}

Expand Down Expand Up @@ -9993,7 +10011,7 @@ main(int argc, char *argv[])
if (c & (JOB_SVFLG_OVERLMT1 | JOB_SVFLG_OVERLMT2 | JOB_SVFLG_TERMJOB))
continue;

if (job_over_limit(pjob)) {
if (job_over_limit(pjob, recover)) {

char *kill_msg;
log_event(PBSEVENT_JOB | PBSEVENT_FORCE,
Expand Down

0 comments on commit d48f11b

Please sign in to comment.