Skip to content

Commit

Permalink
Merge pull request #668 from ystia/feature/GH-664_expose_slurm_jobs_a…
Browse files Browse the repository at this point in the history
…ttributes

Expose slurm jobs attributes
  • Loading branch information
loicalbertin committed Aug 17, 2020
2 parents 803a9ef + 0eae4bb commit 3e8ea79
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### FEATURES

* Added an ElasticSearch store for events and logs ([GH-658](https://github.com/ystia/yorc/issues/658))
* [Slurm] Expose Slurm scontrol show job results as job attributes ([GH-664](https://github.com/ystia/yorc/issues/664))

### SECURITY FIXES

Expand Down
5 changes: 3 additions & 2 deletions prov/ansible/monitoring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package ansible
import (
"context"
"encoding/json"
"github.com/ystia/yorc/v4/deployments"
"strings"

"github.com/ystia/yorc/v4/deployments"

"github.com/pkg/errors"

"github.com/ystia/yorc/v4/config"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (o *actionOperator) ExecAction(ctx context.Context, cfg config.Configuratio
opErr := o.executor.ExecOperation(ctx, cfg, originalTaskID, deploymentID, nodeName, operation)

if strings.ToLower(operation.Name) == tosca.RunnableRunOperationName {
// for now we consider only instance 0
// TODO(loicalbertin) for now we consider only instance 0 (https://github.com/ystia/yorc/issues/670)
dataName := nodeName + "-0-TOSCA_JOB_STATUS"
status, err := tasks.GetTaskData(originalTaskID, dataName)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions prov/kubernetes/execution_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (e *execution) cancelJob(ctx context.Context, clientset kubernetes.Interfac
return err
}
// Not cancelling within the same task try to get jobID from attribute
// TODO(loicalbertin) for now we consider only instance 0 (https://github.com/ystia/yorc/issues/670)
jobIDValue, err := deployments.GetInstanceAttributeValue(ctx, e.deploymentID, e.nodeName, "0", "job_id")
if err != nil {
return errors.Wrap(err, "failed to retrieve job id to cancel, found neither in task context neither as instance attribute")
Expand Down
2 changes: 1 addition & 1 deletion prov/kubernetes/monitoring_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (o *actionOperator) monitorJob(ctx context.Context, cfg config.Configuratio
if job.Status.Active == 0 && job.Status.Succeeded == 0 && job.Status.Failed == 0 {
jobState = "No pods created"
}

// TODO(loicalbertin) for now we consider only instance 0 (https://github.com/ystia/yorc/issues/670)
// Get previus node status and avoit to set err to nil if no error occurs in get
previousState, err1 := deployments.GetInstanceStateString(ctx, deploymentID, action.Data["nodeName"], "0")
if err1 != nil {
Expand Down
6 changes: 5 additions & 1 deletion prov/slurm/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
package slurm

import (
"github.com/stretchr/testify/require"
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/ystia/yorc/v4/config"
"github.com/ystia/yorc/v4/locations"
"github.com/ystia/yorc/v4/testutil"
Expand Down Expand Up @@ -76,5 +77,8 @@ func TestRunConsulSlurmPackageTests(t *testing.T) {
t.Run("ExecutionCommonPrepareAndSubmitJob", func(t *testing.T) {
testExecutionCommonPrepareAndSubmitJob(t)
})
t.Run("ActionOperatorAnalyzeJob", func(t *testing.T) {
testActionOperatorAnalyzeJob(t, srv, cfg)
})
})
}
1 change: 1 addition & 0 deletions prov/slurm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (e *executionCommon) execute(ctx context.Context) error {
if !tasks.IsTaskDataNotFoundError(err) {
return err
}
// TODO(loicalbertin) for now we consider only instance 0 (https://github.com/ystia/yorc/issues/670)
// Not cancelling within the same task try to get jobID from attribute
id, err := deployments.GetInstanceAttributeValue(ctx, e.deploymentID, e.NodeName, "0", "job_id")
if err != nil {
Expand Down
131 changes: 88 additions & 43 deletions prov/slurm/monitoring_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"

"github.com/hashicorp/consul/api"
"github.com/pkg/errors"

"github.com/ystia/yorc/v4/config"
Expand Down Expand Up @@ -66,69 +67,84 @@ func (o *actionOperator) ExecAction(ctx context.Context, cfg config.Configuratio
return true, errors.Errorf("Unsupported actionType %q", action.ActionType)
}

func (o *actionOperator) monitorJob(ctx context.Context, cfg config.Configuration, deploymentID string, action *prov.Action) (bool, error) {
var (
err error
deregister bool
ok bool
)
func (o *actionOperator) updateJobAttributes(ctx context.Context, deploymentID, nodeName, instanceName string, jobInfo map[string]string) error {
for k, v := range jobInfo {
value, err := deployments.GetInstanceAttributeValue(ctx, deploymentID, nodeName, instanceName, k)
if err != nil {
return err
}
if value == nil || value.RawString() != v {
err = deployments.SetInstanceAttributeComplex(ctx, deploymentID, nodeName, instanceName, k, v)
if err != nil {
return err
}
}
}

return nil
}

func getMonitoringJobActionData(action *prov.Action) (*actionData, error) {
var ok bool

actionData := &actionData{}
// Check jobID
actionData.jobID, ok = action.Data["jobID"]
if !ok {
return true, errors.Errorf("Missing mandatory information jobID for actionType:%q", action.ActionType)
return nil, errors.Errorf("Missing mandatory information jobID for actionType:%q", action.ActionType)
}
// Check stepName
actionData.stepName, ok = action.Data["stepName"]
if !ok {
return true, errors.Errorf("Missing mandatory information stepName for actionType:%q", action.ActionType)
return nil, errors.Errorf("Missing mandatory information stepName for actionType:%q", action.ActionType)
}
// Check workingDir
actionData.workingDir, ok = action.Data["workingDir"]
if !ok {
return true, errors.Errorf("Missing mandatory information workingDir for actionType:%q", action.ActionType)
return nil, errors.Errorf("Missing mandatory information workingDir for actionType:%q", action.ActionType)
}
// Check taskID
actionData.taskID, ok = action.Data["taskID"]
if !ok {
return true, errors.Errorf("Missing mandatory information taskID for actionType:%q", action.ActionType)
return nil, errors.Errorf("Missing mandatory information taskID for actionType:%q", action.ActionType)
}
// Check artifacts (optional)
artifactsStr, ok := action.Data["artifacts"]
if ok {
actionData.artifacts = strings.Split(artifactsStr, ",")
}

nodeName := action.Data["nodeName"]
return actionData, nil

var locationProps config.DynamicMap
locationMgr, err := locations.GetManager(cfg)
if err == nil {
locationProps, err = locationMgr.GetLocationPropertiesForNode(ctx, deploymentID, nodeName, infrastructureType)
}
if err != nil {
return true, err
}
}

credentials, err := getUserCredentials(ctx, locationProps, deploymentID, nodeName, "")
if err != nil {
return true, err
}
// Get a sshClient to connect to slurm client node, and execute slurm commands such as squeue, or system commands such as cp, mv, mkdir, etc.
sshClient, err := getSSHClient(cfg, credentials, locationProps)
func (o *actionOperator) analyzeJob(ctx context.Context, cc *api.Client, sshClient sshutil.Client, deploymentID, nodeName string, action *prov.Action, keepArtifacts bool) (bool, error) {
var (
err error
deregister bool
)

actionData, err := getMonitoringJobActionData(action)
if err != nil {
return true, err
}

info, err := getJobInfo(sshClient, actionData.jobID)

// TODO(loicalbertin): This should be improved instance name should not be hard-coded (https://github.com/ystia/yorc/issues/670)
instanceName := "0"

if err != nil {
if isNoJobFoundError(err) {
// the job is not found in slurm database (should have been purged) : pass its status to "UNKNOWN"
deployments.SetInstanceStateStringWithContextualLogs(ctx, deploymentID, nodeName, "0", "UNKNOWN")
deployments.SetInstanceStateStringWithContextualLogs(ctx, deploymentID, nodeName, instanceName, "UNKNOWN")
}
return true, errors.Wrapf(err, "failed to get job info with jobID:%q", actionData.jobID)
}
err = o.updateJobAttributes(ctx, deploymentID, nodeName, instanceName, info)
if err != nil {
return true, errors.Wrapf(err, "failed to update job attributes with jobID: %q", actionData.jobID)
}

var mess string
if info["Reason"] != "None" {
Expand All @@ -141,27 +157,27 @@ func (o *actionOperator) monitorJob(ctx context.Context, cfg config.Configuratio
stdOut, existStdOut := info["StdOut"]
stdErr, existStdErr := info["StdErr"]
if existStdOut && existStdErr && stdOut == stdErr {
o.logFile(ctx, cfg, action, deploymentID, stdOut, "StdOut/StdErr", sshClient)
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut/StdErr", sshClient)
} else {
if existStdOut {
o.logFile(ctx, cfg, action, deploymentID, stdOut, "StdOut", sshClient)
o.logFile(ctx, cc, action, deploymentID, stdOut, "StdOut", sshClient)
}
if existStdErr {
o.logFile(ctx, cfg, action, deploymentID, stdErr, "StdErr", sshClient)
o.logFile(ctx, cc, action, deploymentID, stdErr, "StdErr", sshClient)
}
}

// See default output if nothing is specified here
if !existStdOut && !existStdErr {
o.logFile(ctx, cfg, action, deploymentID, fmt.Sprintf("slurm-%s.out", actionData.jobID), "StdOut/Stderr", sshClient)
o.logFile(ctx, cc, action, deploymentID, fmt.Sprintf("slurm-%s.out", actionData.jobID), "StdOut/Stderr", sshClient)
}

previousJobState, err := deployments.GetInstanceStateString(ctx, deploymentID, nodeName, "0")
previousJobState, err := deployments.GetInstanceStateString(ctx, deploymentID, nodeName, instanceName)
if err != nil {
return true, errors.Wrapf(err, "failed to get instance state for job %q", actionData.jobID)
}
if previousJobState != info["JobState"] {
deployments.SetInstanceStateStringWithContextualLogs(ctx, deploymentID, nodeName, "0", info["JobState"])
deployments.SetInstanceStateStringWithContextualLogs(ctx, deploymentID, nodeName, instanceName, info["JobState"])
}

// See if monitoring must be continued and set job state if terminated
Expand All @@ -182,14 +198,50 @@ func (o *actionOperator) monitorJob(ctx context.Context, cfg config.Configuratio

// cleanup except if error occurred or explicitly specified in config
if deregister && err == nil {
if !locationProps.GetBool("keep_job_remote_artifacts") {
if !keepArtifacts {
o.removeArtifacts(actionData, sshClient)
}
}
return deregister, err
}

func (o *actionOperator) removeArtifacts(actionData *actionData, sshClient *sshutil.SSHClient) {
func (o *actionOperator) monitorJob(ctx context.Context, cfg config.Configuration, deploymentID string, action *prov.Action) (bool, error) {
var (
err error
)

nodeName := action.Data["nodeName"]

var locationProps config.DynamicMap
locationMgr, err := locations.GetManager(cfg)
if err == nil {
locationProps, err = locationMgr.GetLocationPropertiesForNode(ctx, deploymentID, nodeName, infrastructureType)
}
if err != nil {
return true, err
}

credentials, err := getUserCredentials(ctx, locationProps, deploymentID, nodeName, "")
if err != nil {
return true, err
}
// Get a sshClient to connect to slurm client node, and execute slurm commands such as squeue, or system commands such as cp, mv, mkdir, etc.
sshClient, err := getSSHClient(cfg, credentials, locationProps)
if err != nil {
return true, err
}

cc, err := cfg.GetConsulClient()
if err != nil {
log.Debugf("fail to retrieve consul client due to error:%+v:", err)
return true, err
}

return o.analyzeJob(ctx, cc, sshClient, deploymentID, nodeName, action, locationProps.GetBool("keep_job_remote_artifacts"))

}

func (o *actionOperator) removeArtifacts(actionData *actionData, sshClient sshutil.Client) {
for _, art := range actionData.artifacts {
if art != "" {
p := path.Join(actionData.workingDir, art)
Expand All @@ -203,7 +255,7 @@ func (o *actionOperator) removeArtifacts(actionData *actionData, sshClient *sshu
}
}

func (o *actionOperator) logFile(ctx context.Context, cfg config.Configuration, action *prov.Action, deploymentID, filePath, fileType string, sshClient *sshutil.SSHClient) {
func (o *actionOperator) logFile(ctx context.Context, cc *api.Client, action *prov.Action, deploymentID, filePath, fileType string, sshClient sshutil.Client) {
fileTypeKey := fmt.Sprintf("lastIndex%s", strings.Replace(fileType, "/", "", -1))
// Get the log last index
lastInd, err := o.getLogLastIndex(action, fileTypeKey)
Expand All @@ -220,17 +272,10 @@ func (o *actionOperator) logFile(ctx context.Context, cfg config.Configuration,
}
if strings.TrimSpace(output) != "" {
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelDEBUG, deploymentID).RegisterAsString(fmt.Sprintf("Run the command: %q", cmd))
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, deploymentID).RegisterAsString(fmt.Sprintf("%s %s:", fileType, filePath))
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, deploymentID).RegisterAsString("\n" + output)
events.WithContextOptionalFields(ctx).NewLogEntry(events.LogLevelINFO, deploymentID).RegisterAsString(fmt.Sprintf("%s %s:\n%s", fileType, filePath, output))
}

// Update the last index
cc, err := cfg.GetConsulClient()
if err != nil {
log.Debugf("fail to retrieve consul client due to error:%+v:", err)
return
}

newInd := strconv.Itoa(lastInd + strings.Count(output, "\n"))
err = scheduling.UpdateActionData(cc, action.ID, fileTypeKey, newInd)
if err != nil {
Expand Down

0 comments on commit 3e8ea79

Please sign in to comment.