Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add possibility of resuming multinode jobs #1955

Merged
merged 2 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/include/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ struct job {
int ji_stderr; /* socket for stderr */
int ji_ports[2]; /* ports for stdout/err */
enum bg_hook_request ji_hook_running_bg_on; /* set when hook starts in the background*/
int ji_msconnected; /* 0 - not connected, 1 - connected */
pbs_list_head ji_multinodejobs; /* links to recovered multinode jobs */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lease initialize ji_msconnected in server/job_func.c:job_alloc() under #ifdef PBS_MOM section.
Initialize also ji_multinodejobs in server/job_func.c:job_alloc() by calling CLEAR_HEAD(ji_multinodejobs).
and also freeing memory allocated to ji_multinodejobs in server/job_func.c:job_free() under the #ifdef PBS_MOM section.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I'm already initializing these variables in job_func.c:355,356, should I do something more than that?

Since ji_multinodejobs only contains pointers to jobs that are already managed, will CLEAR_HEAD(ji_multinodejobs) suffice in job_free()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that you were already initializing them in job_func.c under job_alloc() so that's good. For the job_free(), yes, just do the CLEAR_HEAD.

#else /* END Mom ONLY - start Server ONLY */
struct batch_request *ji_pmt_preq; /* outstanding preempt job request for deleting jobs */
int ji_discarding; /* discarding job */
Expand Down Expand Up @@ -592,6 +594,8 @@ struct job {
#ifdef PBS_MOM
tm_host_id ji_nodeidx; /* my node id */
tm_task_id ji_taskidx; /* generate task id's for job */
int ji_stdout;
int ji_stderr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize these in server/job_func.c:job_alloc() under #ifdef PBS_MOM section.

#if MOM_ALPS
long ji_reservation;
/* ALPS reservation identifier */
Expand Down Expand Up @@ -745,6 +749,8 @@ typedef struct infoent {
#define IM_EXEC_PROLOGUE 24
#define IM_CRED 25
#define IM_PMIX 26
#define IM_RECONNECT_TO_MS 27
#define IM_JOIN_RECOV_JOB 28

#define IM_ERROR 99
#define IM_ERROR2 100
Expand Down
2 changes: 1 addition & 1 deletion src/include/mom_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ extern pid_t fork_me(int sock);
extern ssize_t readpipe(int pfd, void *vptr, size_t nbytes);
extern ssize_t writepipe(int pfd, void *vptr, size_t nbytes);
extern int get_la(double *);
extern void init_abort_jobs(int);
extern void init_abort_jobs(int, pbs_list_head *);
extern void checkret(char **spot, int len);
extern void mom_nice(void);
extern void mom_unnice(void);
Expand Down
23 changes: 20 additions & 3 deletions src/resmom/catch_child.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ scan_for_exiting(void)
for (pjob = (job *)GET_NEXT(svr_alljobs); pjob; pjob = nxjob) {
nxjob = (job *)GET_NEXT(pjob->ji_alljobs);

if (pjob->ji_numnodes > 1 && !pjob->ji_msconnected && pjob->ji_nodeid) /* assume that MS has a connection to itself at all times */
continue;

/*
** If a restart is active, skip this job since
** not all of the tasks may have started yet.
Expand Down Expand Up @@ -938,11 +941,12 @@ send_hellosvr(int stream)
* terminated and requeued.
*
* @param [in] recover - Specify recovering mode for MoM.
* @param [in] multinode_jobs - Pointer to list of pointers to recovered multinode jobs
*
*/

void
init_abort_jobs(int recover)
init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
{
DIR *dir;
int i, sisters;
Expand All @@ -958,6 +962,8 @@ init_abort_jobs(int recover)
extern char *path_checkpoint;
extern char *path_spool;

CLEAR_HEAD((*multinode_jobs));

dir = opendir(path_jobs);
if (dir == NULL) {
log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
Expand Down Expand Up @@ -1021,8 +1027,10 @@ init_abort_jobs(int recover)
*/
if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
/* I am sister, junk the job files */
mom_deljob(pj);
continue;
if( recover != 2 ) {
mom_deljob(pj);
continue;
}
}

sisters = pj->ji_numnodes - 1;
Expand Down Expand Up @@ -1121,6 +1129,15 @@ init_abort_jobs(int recover)

if (mom_do_poll(pj))
append_link(&mom_polljobs, &pj->ji_jobque, pj);

if (sisters > 0)
append_link(multinode_jobs, &pj->ji_multinodejobs, pj);

if (pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
pj->ji_stdout = pj->ji_ports[0] = pj->ji_extended.ji_ext.ji_stdout;
pj->ji_stderr = pj->ji_ports[1] = pj->ji_extended.ji_ext.ji_stdout;
}
}
}
if (errno != 0 && errno != ENOENT) {
Expand Down
33 changes: 32 additions & 1 deletion src/resmom/mom_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>

#include <unistd.h>
#include <dirent.h>
Expand Down Expand Up @@ -117,6 +118,7 @@ extern char *msg_err_malloc;
extern int
write_pipe_data(int upfds, void *data, int data_size);
char task_fmt[] = "/%8.8X";
extern void resume_multinode(job *pjob);


/* Function pointers
Expand Down Expand Up @@ -2315,7 +2317,7 @@ term_job(job *pjob)
int num;

for (num=0, np = pjob->ji_hosts;
num<pjob->ji_numnodes;
num < pjob->ji_numnodes;
num++, np++) {
if (np->hn_stream >= 0) {
np->hn_stream = -1;
Expand Down Expand Up @@ -2380,6 +2382,7 @@ im_eof(int stream, int ret)
np->hn_stream = -1;
if (np->hn_eof_ts == 0)
np->hn_eof_ts = time(0);
pjob->ji_msconnected = 0;

/*
** In case connection to pbs_comm is down/recently established, do not kill a job that is actually running.
Expand Down Expand Up @@ -2493,6 +2496,7 @@ check_ms(int stream, job *pjob)
np->hn_stream = stream;
}
np->hn_eof_ts = 0;
pjob->ji_msconnected = 1;
return FALSE;
}

Expand Down Expand Up @@ -3020,6 +3024,27 @@ im_request(int stream, int version)
BAIL("fromtask")
switch (command) {

case IM_JOIN_RECOV_JOB:
reply = 1;

hnodenum = disrsi(stream, &ret);
BAIL("JOINJOB nodenum")

np = NULL;
/* job should already exist */
pjob = find_job(jobid);
if( pjob == NULL ) {
SEND_ERR(PBSE_SYSTEM)
goto done;
}
pjob->ji_stdout = disrsi(stream, &ret);
BAIL("JOINJOB stdout")
pjob->ji_stderr = disrsi(stream, &ret);
BAIL("JOINJOB stderr")
pjob->ji_qs.ji_un.ji_momt.ji_exuid = pjob->ji_grpcache->gc_uid;
pjob->ji_qs.ji_un.ji_momt.ji_exgid = pjob->ji_grpcache->gc_gid;
pjob->ji_msconnected = 1;
goto done;
case IM_JOIN_JOB:
/*
** Sender is mom superior sending a job structure to me.
Expand Down Expand Up @@ -3064,6 +3089,7 @@ im_request(int stream, int version)
info = disrcs(stream, &len, &ret);
BAIL("JOINJOB credential")
}
pjob->ji_msconnected = 1;

pjob->ji_numnodes = hnodenum;
CLEAR_HEAD(lhead);
Expand Down Expand Up @@ -4030,6 +4056,7 @@ im_request(int stream, int version)
op->oe_u.oe_tm.oe_node = pvnodeid;
op->oe_u.oe_tm.oe_event = event;
op->oe_u.oe_tm.oe_taskid = fromtask;
task_save(ptask);
reply = 0;
}
break;
Expand Down Expand Up @@ -5403,6 +5430,10 @@ im_request(int stream, int version)
if (ret != DIS_SUCCESS)
goto err;
break;
case IM_RECONNECT_TO_MS:
if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE)
resume_multinode(pjob);
break;

default:
sprintf(log_buffer, "unknown command %d sent", command);
Expand Down
28 changes: 23 additions & 5 deletions src/resmom/mom_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ char *mom_domain;
#endif /* WIN32 */

extern void mom_vnlp_report(vnl_t *vnl, char *header);
extern void resume_multinode(job *pjob);

int alien_attach = 0; /* attach alien procs */
int alien_kill = 0; /* kill alien procs */
Expand Down Expand Up @@ -637,7 +638,7 @@ extern void dep_cleanup(void);

/* External Functions */
extern void catch_child(int);
extern void init_abort_jobs(int);
extern void init_abort_jobs(int, pbs_list_head *);
extern void scan_for_exiting(void);
#ifdef NAS /* localmod 015 */
extern int to_size(char *, struct size_value *);
Expand Down Expand Up @@ -6973,6 +6974,7 @@ mom_over_limit(job *pjob)
* check attr value limits of job
*
* @param[in] pjob - pointer to job
* @param[in] recover - recovering mode for MoM
*
* @return int
* @retval 0 Failure
Expand All @@ -6981,7 +6983,7 @@ mom_over_limit(job *pjob)
*/

int
job_over_limit(job *pjob)
job_over_limit(job *pjob, int recover)
{
attribute *attr;
attribute *used;
Expand Down Expand Up @@ -7014,7 +7016,7 @@ job_over_limit(job *pjob)

/* special case EOF */
if (pnode->hn_sister == SISTER_EOF) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob))) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob)) || recover == 2) {
snprintf(log_buffer, sizeof(log_buffer), "ignoring node EOF %d from failed mom %s as job is tolerant of node failures", pjob->ji_nodekill, pnode->hn_host?pnode->hn_host:"");
log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
return 0;
Expand Down Expand Up @@ -9217,8 +9219,10 @@ main(int argc, char *argv[])
log_err(c, msg_daemonname, "unable to recover vnode to host mapping");
}

pbs_list_link multinode_jobs;

/* recover & abort Jobs which were under MOM's control */
init_abort_jobs(recover);
init_abort_jobs(recover, &multinode_jobs);

/* deploy periodic hooks */
mom_hook_input_init(&hook_input);
Expand Down Expand Up @@ -9272,6 +9276,20 @@ main(int argc, char *argv[])
if (time_now > time_next_hello) {
send_hellosvr(server_stream);
time_next_hello = time_now + time_delta_hellosvr(MOM_DELTA_NORMAL);
if (server_stream != -1) {
job *m_job;
for (m_job = (job *)GET_NEXT(multinode_jobs); m_job;
m_job = (job *)GET_NEXT(m_job->ji_multinodejobs)) {
if (m_job->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
resume_multinode(m_job);
} else {
/* I am sister */
send_sisters(m_job, IM_RECONNECT_TO_MS, NULL);
}
}
CLEAR_HEAD(multinode_jobs);
}
}
} else
send_pending_updates();
Expand Down Expand Up @@ -9661,7 +9679,7 @@ main(int argc, char *argv[])
if (c & (JOB_SVFLG_OVERLMT1 | JOB_SVFLG_OVERLMT2 | JOB_SVFLG_TERMJOB))
continue;

if (job_over_limit(pjob)) {
if (job_over_limit(pjob, recover)) {

char *kill_msg;
log_event(PBSEVENT_JOB | PBSEVENT_FORCE,
Expand Down
48 changes: 48 additions & 0 deletions src/resmom/start_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -5927,6 +5927,52 @@ job_nodes(struct job *pjob)
return job_nodes_inner(pjob, NULL);
}

/**
* @brief
* Resume multinode job after one or more sisters has been restarted
*
* @param[in] pjob - job pointer
*
* @return Void
*
*/

void resume_multinode(job *pjob)
{
if (pjob->ji_hosts == NULL)
return;

int com = IM_JOIN_RECOV_JOB;
hnodent *np = NULL;
eventent *ep = NULL;
int i;
for(i = 1; i < pjob->ji_numnodes; i++) {
np = &pjob->ji_hosts[i];

if( i == 1 )
ep = event_alloc(pjob, com, -1, np, TM_NULL_EVENT, TM_NULL_TASK);
else
ep = event_dup(ep, pjob, np);

if (ep == NULL) {
exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
return;
}

int stream = np->hn_stream;
im_compose(stream, pjob->ji_qs.ji_jobid,
pjob->ji_wattr[(int)JOB_ATR_Cookie].at_val.at_str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zv0n sorry but one very small change: (as now things have changed in mainline about how we access job attributes using getters/setters):
Can you please repalce this line with get_jattr_str(pjob, JOB_ATR_Cookie)?

com, ep->ee_event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);
(void)diswsi(stream, pjob->ji_numnodes);
(void)diswsi(stream, pjob->ji_ports[0]);
(void)diswsi(stream, pjob->ji_ports[1]);
dis_flush(stream);
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
send_cred_sisters(pjob);
#endif
}
}

/**
* @brief
* start_exec() - start execution of a job
Expand Down Expand Up @@ -6189,6 +6235,8 @@ start_exec(job *pjob)
}
pjob->ji_stdout = socks[0];
pjob->ji_stderr = socks[1];
pjob->ji_extended.ji_ext.ji_stdout = pjob->ji_ports[0];
pjob->ji_extended.ji_ext.ji_stderr = pjob->ji_ports[1];
}

for (i = 1; i < nodenum; i++) {
Expand Down
6 changes: 6 additions & 0 deletions src/server/job_func.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ job_alloc(void)
pj->ji_stderr = 0;
pj->ji_setup = NULL;
pj->ji_momsubt = 0;
pj->ji_msconnected = 0;
CLEAR_HEAD(pj->ji_multinodejobs);
pj->ji_extended.ji_ext.ji_stdout = 0;
pj->ji_extended.ji_ext.ji_stderr = 0;
#else /* SERVER */
pj->ji_discarding = 0;
pj->ji_prunreq = NULL;
Expand Down Expand Up @@ -575,6 +579,8 @@ job_free(job *pj)
if (job_free_extra != NULL)
job_free_extra(pj);

CLEAR_HEAD(pj->ji_multinodejobs);

#ifdef WIN32
if (pj->ji_hJob) {
CloseHandle(pj->ji_hJob);
Expand Down
4 changes: 2 additions & 2 deletions test/fw/ptl/lib/pbs_testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13340,14 +13340,14 @@ def stop(self, sig=None):
raise PbsServiceError(rc=e.rc, rv=e.rv, msg=e.msg)
return True

def restart(self):
def restart(self, args=None):
"""
Restart the PBS mom
"""
if self.isUp():
if not self.stop():
return False
return self.start()
return self.start(args=args)

def log_match(self, msg=None, id=None, n=50, tail=True, allmatch=False,
regexp=False, max_attempts=None, interval=None,
Expand Down