Skip to content

Commit

Permalink
Reduce repetition in jobflow step logging (closes #39)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed Jan 24, 2018
1 parent 2cfcbb3 commit b8b1a2b
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 43 deletions.
120 changes: 80 additions & 40 deletions src/job_flow_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@ func InitJobFlowSteps(playbookConfig PlaybookConfig, jobflowID string, isAsync,
}, nil
}

// AddJobFlowSteps builds the parameters and then submits them to
// the running EMR cluster
// AddJobFlowSteps builds the parameters and then submits them to the running EMR cluster
func (jfs JobFlowSteps) AddJobFlowSteps() error {
params, err := jfs.GetJobFlowStepsInput()
if err != nil {
return err
}

done := false
successCount := 0
errorCount := 0
historicalInfoLogs := []string{}
historicalErrorLogs := []string{}

resp, err := jfs.EmrSvc.AddJobFlowSteps(params)
addJobFlowStepsOutput, err := jfs.EmrSvc.AddJobFlowSteps(params)
if err != nil {
return err
}
Expand All @@ -97,54 +97,94 @@ func (jfs JobFlowSteps) AddJobFlowSteps() error {
" steps to the EMR cluster with jobflow id '" + jfs.JobflowID + "'...")

for done == false && jfs.IsBlocking == true {
successCount, errCount, infoLogs, errorLogs, err :=
jfs.RetrieveStepsStates(addJobFlowStepsOutput)
if err != nil {
return err
}
errorCount = errCount

for _, stepID := range resp.StepIds {
params1 := &emr.DescribeStepInput{
ClusterId: aws.String(jfs.JobflowID),
StepId: stepID,
}

resp1, err := jfs.EmrSvc.DescribeStep(params1)
if err != nil {
return err
}

if *resp1.Step.Status.State == "COMPLETED" {
log.Info("Step '" + *resp1.Step.Name + "' with id '" +
*resp1.Step.Id + "' completed successfully")
successCount++
} else if *resp1.Step.Status.State == "CANCELLED" || *resp1.Step.Status.State == "FAILED" {
log.Error("Step '" + *resp1.Step.Name + "' with id '" +
*resp1.Step.Id + "' was " + *resp1.Step.Status.State)
if jfs.LogFailedSteps && *resp1.Step.Status.State == "FAILED" {
logs, err := jfs.GetStepLogs(*resp1.Step.Id)
if err != nil {
log.Error("Couldn't retrieve failed step " + *resp1.Step.Id + " logs: " + err.Error())
} else {
for filename, content := range logs {
log.Error("Content of log file '" + filename + "':")
log.Error(content)
}
}
}
errorCount++
}
for _, l := range Diff(historicalInfoLogs, infoLogs) {
log.Info(l)
}
for _, l := range Diff(historicalErrorLogs, errorLogs) {
log.Error(l)
}
historicalInfoLogs = infoLogs
historicalErrorLogs = errorLogs

if (successCount + errorCount) == len(resp.StepIds) {
if (successCount + errorCount) == len(addJobFlowStepsOutput.StepIds) {
done = true
} else {
time.Sleep(time.Second * 15)
successCount = 0
errorCount = 0
}
}

if errorCount == 0 {
return nil
}
return errors.New("" + strconv.Itoa(errorCount) + "/" + strconv.Itoa(len(resp.StepIds)) +
" steps failed to complete successfully")
return errors.New("" + strconv.Itoa(errorCount) + "/" +
strconv.Itoa(len(addJobFlowStepsOutput.StepIds)) + " steps failed to complete successfully")
}

// RetrieveStepsStates retrieves the states of all the steps for a job flow returning the number
// of successful / failed steps as well as information about success or failure for each one
func (jfs JobFlowSteps) RetrieveStepsStates(addJobFlowStepsOutput *emr.AddJobFlowStepsOutput) (int, int, []string, []string, error) {
successCount := 0
errorCount := 0
infoLogs := make([]string, 0)
errorLogs := make([]string, 0)
for _, stepID := range addJobFlowStepsOutput.StepIds {
success, failure, logs, err := jfs.RetrieveStepState(*stepID)
if err != nil {
return 0, 0, nil, nil, err
}
if success {
successCount++
infoLogs = append(infoLogs, logs...)
}
if failure {
errorCount++
errorLogs = append(infoLogs, logs...)
}
}
return successCount, errorCount, infoLogs, errorLogs, nil
}

// RetrieveStepState retrieves the state of a particular step, optionally retrieving the logs if
// it failed
func (jfs JobFlowSteps) RetrieveStepState(stepID string) (bool, bool, []string, error) {
describeStepInput := &emr.DescribeStepInput{
ClusterId: aws.String(jfs.JobflowID),
StepId: aws.String(stepID),
}
dso, err := jfs.EmrSvc.DescribeStep(describeStepInput)
if err != nil {
return false, false, nil, errwrap.Wrapf("Couldn't retrieve step "+stepID+" state: {{err}}", err)
}

if *dso.Step.Status.State == "COMPLETED" {
infoLogs := []string{
"Step '" + *dso.Step.Name + "' with id '" + *dso.Step.Id + "' completed successfully"}
return true, false, infoLogs, nil
} else if *dso.Step.Status.State == "CANCELLED" || *dso.Step.Status.State == "FAILED" {
errorLogs := make([]string, 0)
errorLogs = append(errorLogs,
"Step '"+*dso.Step.Name+"' with id '"+*dso.Step.Id+"' was "+*dso.Step.Status.State)
if *dso.Step.Status.State == "FAILED" && jfs.LogFailedSteps {
logs, err := jfs.GetStepLogs(*dso.Step.Id)
if err != nil {
return false, false, nil, err
}
for filename, content := range logs {
errorLogs = append(errorLogs, "Content of log file '"+filename+"':")
errorLogs = append(errorLogs, content)
}
return false, true, errorLogs, nil
}
return false, true, errorLogs, nil
}
return false, false, nil, nil
}

// GetJobFlowStepsInput parses the config given to it and
Expand Down
117 changes: 114 additions & 3 deletions src/job_flow_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (m *mockEMRAPISteps) DescribeStep(input *emr.DescribeStepInput) (*emr.Descr
return &emr.DescribeStepOutput{
Step: &emr.Step{
Name: aws.String("step"),
Id: aws.String("s-id"),
Id: aws.String("step-id"),
Status: &emr.StepStatus{
State: aws.String(state),
},
Expand All @@ -95,7 +95,7 @@ func (m *mockEMRAPISteps) DescribeCluster(input *emr.DescribeClusterInput) (*emr
return &emr.DescribeClusterOutput{Cluster: &emr.Cluster{LogUri: aws.String("s3://bucket/log")}},
nil
}
if *input.ClusterId == "test-get-bucket-fail" {
if *input.ClusterId == "test-get-bucket-fail" || *input.ClusterId == "j-FAILED-logs" {
return &emr.DescribeClusterOutput{Cluster: &emr.Cluster{LogUri: aws.String("://")}}, nil
}
if *input.ClusterId == "test-get-bucket-empty-log-uri" {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAddJobFlowSteps_Fail(t *testing.T) {
jfs.JobflowID = "j-123"
err = jfs.AddJobFlowSteps()
assert.NotNil(err)
assert.Equal("DescribeStep failed", err.Error())
assert.Equal("Couldn't retrieve step 1 state: DescribeStep failed", err.Error())

jfs.JobflowID = "j-FAILED"
err = jfs.AddJobFlowSteps()
Expand Down Expand Up @@ -344,3 +344,114 @@ func TestDownloadLogFiles_Fail(t *testing.T) {
assert.NotNil(err)
assert.Equal("ListObjectsPages failed", err.Error())
}

func TestRetrieveStepsStates(t *testing.T) {
assert := assert.New(t)
ajfso := &emr.AddJobFlowStepsOutput{StepIds: []*string{aws.String("step-id")}}

jfs := mockJobFlowStepsWithoutPlaybook("j-COMPLETED")
successCount, failureCount, infoLogs, errorLogs, err := jfs.RetrieveStepsStates(ajfso)
assert.Equal(1, successCount)
assert.Equal(0, failureCount)
assert.NotNil(infoLogs)
assert.Equal([]string{"Step 'step' with id 'step-id' completed successfully"}, infoLogs)
assert.NotNil(errorLogs)
assert.Equal(0, len(errorLogs))
assert.Nil(err)

jfs = mockJobFlowStepsWithoutPlaybook("j-CANCELLED")
successCount, failureCount, infoLogs, errorLogs, err = jfs.RetrieveStepsStates(ajfso)
assert.Equal(0, successCount)
assert.Equal(1, failureCount)
assert.NotNil(infoLogs)
assert.Equal(0, len(infoLogs))
assert.NotNil(errorLogs)
assert.Equal([]string{"Step 'step' with id 'step-id' was CANCELLED"}, errorLogs)
assert.Nil(err)
}

func TestRetrieveStepsStates_Fail(t *testing.T) {
assert := assert.New(t)
ajfso := &emr.AddJobFlowStepsOutput{StepIds: []*string{aws.String("step-id")}}

// fails if one DescribeStep fails
jfs := mockJobFlowStepsWithoutPlaybook("j-NOTHING")
successCount, failureCount, infoLogs, errorLogs, err := jfs.RetrieveStepsStates(ajfso)
assert.Equal(0, successCount)
assert.Equal(0, failureCount)
assert.Nil(infoLogs)
assert.Nil(errorLogs)
assert.NotNil(err)
assert.Equal("Couldn't retrieve step step-id state: DescribeStep failed", err.Error())
}

func TestRetrieveStepState(t *testing.T) {
assert := assert.New(t)
stepID := "step-id"

// log completed steps
jfs := mockJobFlowStepsWithoutPlaybook("j-COMPLETED")
success, failure, logs, err := jfs.RetrieveStepState(stepID)
assert.Equal(true, success)
assert.Equal(false, failure)
assert.NotNil(logs)
assert.Equal([]string{"Step 'step' with id 'step-id' completed successfully"}, logs)
assert.Nil(err)

// log cancelled steps
jfs = mockJobFlowStepsWithoutPlaybook("j-CANCELLED")
success, failure, logs, err = jfs.RetrieveStepState(stepID)
assert.Equal(false, success)
assert.Equal(true, failure)
assert.NotNil(logs)
assert.Equal([]string{"Step 'step' with id 'step-id' was CANCELLED"}, logs)
assert.Nil(err)

// outputs the failed step log
jobflowID := "j-FAILED-gz"
tmpDirInput := filepath.Join("tmp-gz", "log", jobflowID, "steps", stepID)
os.MkdirAll(tmpDirInput, 0755)
jfs = mockJobFlowStepsWithoutPlaybook(jobflowID)
content := "test.gz"
filename := "test"
WriteGzFile(filename, tmpDirInput, content)
success, failure, logs, err = jfs.RetrieveStepState(stepID)
assert.Equal(false, success)
assert.Equal(true, failure)
assert.NotNil(logs)
assert.Equal([]string{"Step 'step' with id 'step-id' was FAILED",
"Content of log file 'test.gz':", "test.gz"}, logs)
assert.Nil(err)
os.RemoveAll(tmpDirInput)

// ignores steps that are running
jfs = mockJobFlowStepsWithoutPlaybook("j-RUNNING")
success, failure, logs, err = jfs.RetrieveStepState(stepID)
assert.Equal(false, success)
assert.Equal(false, failure)
assert.Nil(logs)
assert.Nil(err)
}

func TestRetrieveStepState_Fail(t *testing.T) {
assert := assert.New(t)
stepID := "step-id"

// fails if DescribeStep fails
jfs := mockJobFlowStepsWithoutPlaybook("j-nothing")
success, failure, logs, err := jfs.RetrieveStepState(stepID)
assert.Equal(false, success)
assert.Equal(false, failure)
assert.Nil(logs)
assert.NotNil(err)
assert.Equal("Couldn't retrieve step step-id state: DescribeStep failed", err.Error())

// fails if parsing the log uri fails
jfs = mockJobFlowStepsWithoutPlaybook("j-FAILED-logs")
success, failure, logs, err = jfs.RetrieveStepState(stepID)
assert.Equal(false, success)
assert.Equal(false, failure)
assert.Nil(logs)
assert.NotNil(err)
assert.Equal("Couldn't parse LogUri: parse ://: missing protocol scheme", err.Error())
}
15 changes: 15 additions & 0 deletions src/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,18 @@ func ReadGzFiles(dir string) (map[string]string, error) {
}
return m, nil
}

// Diff outputs the difference between two string slices where a is the reference (a - b)
func Diff(a, b []string) []string {
m := make(map[string]bool)
for _, s := range a {
m[s] = true
}
d := make([]string, 0)
for _, s := range b {
if !m[s] {
d = append(d, s)
}
}
return d
}
6 changes: 6 additions & 0 deletions src/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func TestStringInSlice(t *testing.T) {
assert.Equal(false, StringInSlice("a", []string{"b", "c"}))
}

func TestDiff(t *testing.T) {
assert := assert.New(t)

assert.Equal([]string{"a"}, Diff([]string{"b"}, []string{"a", "b"}))
}

func TestReadGzFile(t *testing.T) {
assert := assert.New(t)
content := "test"
Expand Down

0 comments on commit b8b1a2b

Please sign in to comment.