Skip to content

Commit

Permalink
Merge pull request #758 from ystia/feature/GH-757_Use_sacct_to_retrie…
Browse files Browse the repository at this point in the history
…ve_job_status_when_scontrol_does_not_show_it_anymore

Slurm: Use sacct to retrieve job status when scontrol show job does not show the job anymore
  • Loading branch information
loicalbertin committed Jul 15, 2021
2 parents 28b6706 + 604ff1e commit 7bd38d6
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### ENHANCEMENTS

* Slurm: Use sacct to retrieve job status when scontrol show job does not show the job anymore ([GH-757](https://github.com/ystia/yorc/issues/757))
* Add basic support for ssh on Windows ([GH-751](https://github.com/ystia/yorc/issues/751))
* Add the ability to define OpenStack Compute Instance user_data ([GH-735](https://github.com/ystia/yorc/issues/735))

Expand Down
44 changes: 36 additions & 8 deletions prov/slurm/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ import (

"github.com/ystia/yorc/v4/config"
"github.com/ystia/yorc/v4/deployments"
"github.com/ystia/yorc/v4/events"
"github.com/ystia/yorc/v4/helper/sshutil"
"github.com/ystia/yorc/v4/log"
"github.com/ystia/yorc/v4/tosca/types"
)

const reSbatch = `Submitted batch job (\d+)`

const invalidJob = "Invalid job id specified"
const errMsgInvalidJob = "Invalid job id specified"

const errMsgAccountingDisabled = "Slurm accounting storage is disabled"

// getSSHClient returns a SSH client with slurm credentials from node or job configuration provided by the deployment,
// or by the yorc slurm configuration
Expand Down Expand Up @@ -341,20 +344,45 @@ func parseKeyValue(str string) (bool, string, string) {
return false, "", ""
}

func getJobInfo(client sshutil.Client, jobID string) (map[string]string, error) {
cmd := fmt.Sprintf("scontrol show job %s", jobID)
func getJobStatusUsingAccounting(ctx context.Context, client sshutil.Client, deploymentID, jobID string) (string, error) {
cmd := fmt.Sprintf("sacct -P -n -o JobID,State -j %s | grep \"^%s|\" | awk -F '|' '{print $2;}'", jobID, jobID)
output, err := client.RunCommand(cmd)
out := strings.Trim(output, "\" \t\n\x00")
if err != nil {
if strings.Contains(out, invalidJob) {
return nil, &noJobFound{msg: err.Error()}
if strings.Contains(out, errMsgAccountingDisabled) {
errMsg := fmt.Sprintf("accounting is disabled on Slurm cluster, can't retrieve job status for job %q.", jobID)
log.Print(errMsg)
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelWARN, deploymentID).RegisterAsString(errMsg)
return "", &noJobFound{msg: err.Error()}
}
return nil, errors.Wrap(err, out)
return "", err
}
if out == "" {
return "", &noJobFound{msg: fmt.Sprintf("no accounting information found for job with id: %q", jobID)}
}
if out != "" {
return out, nil
}

func getMinimalJobInfoUsingAccounting(ctx context.Context, client sshutil.Client, deploymentID, jobID string) (map[string]string, error) {
status, err := getJobStatusUsingAccounting(ctx, client, deploymentID, jobID)
if err != nil {
return nil, err
}
return map[string]string{"JobState": status}, nil
}

func getJobInfo(ctx context.Context, client sshutil.Client, deploymentID, jobID string) (map[string]string, error) {
cmd := fmt.Sprintf("scontrol show job %s", jobID)
output, err := client.RunCommand(cmd)
out := strings.Trim(output, "\" \t\n\x00")
if err == nil && out != "" {
return parseJobInfo(strings.NewReader(out))
}
return nil, &noJobFound{msg: fmt.Sprintf("no information found for job with id:%q", jobID)}
if err != nil && !strings.Contains(out, errMsgInvalidJob) {
return nil, errors.Wrap(err, out)
}
log.Debugf("job %q vanished from \"scontrol show job\". Trying accounting to get the job status.", jobID)
return getMinimalJobInfoUsingAccounting(ctx, client, deploymentID, jobID)
}

func quoteArgs(t []string) string {
Expand Down
62 changes: 57 additions & 5 deletions prov/slurm/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package slurm

import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
Expand Down Expand Up @@ -351,10 +352,61 @@ func TestGetJobInfoWithInvalidJob(t *testing.T) {
t.Parallel()
s := &sshutil.MockSSHClient{
MockRunCommand: func(cmd string) (string, error) {
return "slurm_load_jobs error: Invalid job id specified", errors.New("")
if strings.HasPrefix(cmd, "scontrol") {
return "slurm_load_jobs error: Invalid job id specified", errors.New("")
}
return "", nil
},
}
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, info, "info should be nil")

require.Equal(t, true, isNoJobFoundError(err), "expected no job found error")
}

func TestGetJobInfoWithEmptyResponseButAccounting(t *testing.T) {
t.Parallel()
s := &sshutil.MockSSHClient{
MockRunCommand: func(cmd string) (string, error) {
if strings.HasPrefix(cmd, "scontrol") {
return "", nil
}
return "COMPLETED", nil
},
}
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.NotNil(t, info, "info should not be nil")
require.Nil(t, err)
require.Equal(t, map[string]string{"JobState": "COMPLETED"}, info, "unexpected job info")
}

func TestGetJobInfoWithEmptyResponseAndAccountingError(t *testing.T) {
t.Parallel()
s := &sshutil.MockSSHClient{
MockRunCommand: func(cmd string) (string, error) {
if strings.HasPrefix(cmd, "scontrol") {
return "", nil
}
return "", errors.New("some error")
},
}
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, info, "info should be nil")

require.NotNil(t, err, "expecting an error")
}

func TestGetJobInfoWithEmptyResponseAndAccountingNotConfigured(t *testing.T) {
t.Parallel()
s := &sshutil.MockSSHClient{
MockRunCommand: func(cmd string) (string, error) {
if strings.HasPrefix(cmd, "scontrol") {
return "", nil
}
return errMsgAccountingDisabled, errors.New("1")
},
}
info, err := getJobInfo(s, "1234")
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, info, "info should be nil")

require.Equal(t, true, isNoJobFoundError(err), "expected no job found error")
Expand All @@ -374,7 +426,7 @@ func TestGetJobInfo(t *testing.T) {
require.Nil(t, err, "unexpected error while opening test file")
expected, err := parseJobInfo(data)
require.Nil(t, err, "Unexpected error parsing job info")
info, err := getJobInfo(s, "1234")
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, err, "Unexpected error retrieving job info")
require.NotNil(t, info, "info should not be nil")

Expand All @@ -388,7 +440,7 @@ func TestGetJobInfoWithEmptyResponse(t *testing.T) {
return "", nil
},
}
info, err := getJobInfo(s, "1234")
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, info, "info should be nil")

require.Equal(t, true, isNoJobFoundError(err), "expected no job found error")
Expand All @@ -401,7 +453,7 @@ func TestGetJobInfoWithError(t *testing.T) {
return "oups, it's bad", errors.New("this is an error !")
},
}
info, err := getJobInfo(s, "1234")
info, err := getJobInfo(context.Background(), s, "d1", "1234")
require.Nil(t, info, "info should be nil")

require.Equal(t, "oups, it's bad: this is an error !", err.Error(), "expected error")
Expand Down
66 changes: 43 additions & 23 deletions prov/slurm/monitoring_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,40 @@ func getMonitoringJobActionData(action *prov.Action) (*actionData, error) {

}

func getCustomLogStream(cc *api.Client, action *prov.Action, info map[string]string, streamName string) (string, bool) {
stream, streamExist := action.Data[streamName]
if !streamExist {
stream, streamExist = info[streamName]
if streamExist {
action.Data[streamName] = stream
scheduling.UpdateActionData(cc, action.ID, streamName, stream)
}
}
return stream, streamExist
}

func (o *actionOperator) logJob(ctx context.Context, cc *api.Client, sshClient sshutil.Client, deploymentID, jobID string, action *prov.Action, info map[string]string) {

stdOut, existStdOut := getCustomLogStream(cc, action, info, "StdOut")
stdErr, existStdErr := getCustomLogStream(cc, action, info, "StdErr")
if existStdOut && existStdErr && stdOut == stdErr {
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut/StdErr", sshClient)
} else {
if existStdOut {
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut", sshClient)
}
if existStdErr {
o.logFile(ctx, cc, action, deploymentID, stdErr, "StdErr", sshClient)
}
}

// See default output if nothing is specified here
if !existStdOut && !existStdErr {
o.logFile(ctx, cc, action, deploymentID, fmt.Sprintf("slurm-%s.out", jobID), "StdOut/Stderr", sshClient)
}

}

func (o *actionOperator) analyzeJob(ctx context.Context, cc *api.Client, sshClient sshutil.Client, deploymentID, nodeName string, action *prov.Action, keepArtifacts bool) (bool, error) {
var (
err error
Expand All @@ -129,7 +163,7 @@ func (o *actionOperator) analyzeJob(ctx context.Context, cc *api.Client, sshClie
return true, err
}

info, err := getJobInfo(sshClient, actionData.jobID)
info, err := getJobInfo(ctx, sshClient, deploymentID, actionData.jobID)

// TODO(loicalbertin): This should be improved instance name should not be hard-coded (https://github.com/ystia/yorc/issues/670)
instanceName := "0"
Expand All @@ -146,31 +180,17 @@ func (o *actionOperator) analyzeJob(ctx context.Context, cc *api.Client, sshClie
return true, errors.Wrapf(err, "failed to update job attributes with jobID: %q", actionData.jobID)
}

var mess string
if info["Reason"] != "None" {
mess = fmt.Sprintf("Job Name:%s, ID:%s, State:%s, Reason:%s, Execution Time:%s", info["JobName"], info["JobId"], info["JobState"], info["Reason"], info["RunTime"])
} else {
mess = fmt.Sprintf("Job Name:%s, ID:%s, State:%s, Execution Time:%s", info["JobName"], info["JobId"], info["JobState"], info["RunTime"])
}
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, deploymentID).RegisterAsString(mess)

stdOut, existStdOut := info["StdOut"]
stdErr, existStdErr := info["StdErr"]
if existStdOut && existStdErr && stdOut == stdErr {
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut/StdErr", sshClient)
} else {
if existStdOut {
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut", sshClient)
}
if existStdErr {
o.logFile(ctx, cc, action, deploymentID, stdErr, "StdErr", sshClient)
if _, ok := info["RunTime"]; ok {
var mess string
if info["Reason"] != "None" {
mess = fmt.Sprintf("Job Name:%s, ID:%s, State:%s, Reason:%s, Execution Time:%s", info["JobName"], info["JobId"], info["JobState"], info["Reason"], info["RunTime"])
} else {
mess = fmt.Sprintf("Job Name:%s, ID:%s, State:%s, Execution Time:%s", info["JobName"], info["JobId"], info["JobState"], info["RunTime"])
}
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, deploymentID).RegisterAsString(mess)
}

// See default output if nothing is specified here
if !existStdOut && !existStdErr {
o.logFile(ctx, cc, action, deploymentID, fmt.Sprintf("slurm-%s.out", actionData.jobID), "StdOut/Stderr", sshClient)
}
o.logJob(ctx, cc, sshClient, deploymentID, actionData.jobID, action, info)

previousJobState, err := deployments.GetInstanceStateString(ctx, deploymentID, nodeName, instanceName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion prov/slurm/testdata/scontrol_show_job_completed.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ JobId=6260 JobName=test-salloc-Environment
Command=(null)
WorkDir=/home_nfs/john
StdOut=/home_nfs/john/file.out
StdErr=/home_nfs/john/file.err
StdErr=/home_nfs/john/file.out
Power= SICP=0

0 comments on commit 7bd38d6

Please sign in to comment.