Skip to content

Commit

Permalink
Merge logs on task execution event updates (flyteorg#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan committed Oct 22, 2019
1 parent e0e88ef commit 91a82e1
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 9 deletions.
25 changes: 24 additions & 1 deletion pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,29 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task
return taskExecution, nil
}

// Returns the unique list of logs across an existing list and the latest list sent in a task execution event update.
func mergeLogs(existing, latest []*core.TaskLog) []*core.TaskLog {
if len(latest) == 0 {
return existing
}
if len(existing) == 0 {
return latest
}
existingSet := make(map[string]*core.TaskLog, len(existing))
for _, existingLog := range existing {
existingSet[existingLog.Uri] = existingLog
}
// Copy over the existing logs from the input list so we preserve the original order.
logs := existing
for _, latestLog := range latest {
if _, ok := existingSet[latestLog.Uri]; !ok {
// We haven't seen this log before: add it to the output result list.
logs = append(logs, latestLog)
}
}
return logs
}

func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution) error {
var taskExecutionClosure admin.TaskExecutionClosure
err := proto.Unmarshal(taskExecutionModel.Closure, &taskExecutionClosure)
Expand All @@ -141,7 +164,7 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec
taskExecutionModel.PhaseVersion = request.Event.PhaseVersion
taskExecutionClosure.Phase = request.Event.Phase
taskExecutionClosure.UpdatedAt = request.Event.OccurredAt
taskExecutionClosure.Logs = request.Event.Logs
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if (existingTaskPhase == core.TaskExecution_QUEUED.String() || existingTaskPhase == core.TaskExecution_UNDEFINED.String()) && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() {
err = addTaskStartedState(request, taskExecutionModel, &taskExecutionClosure)
if err != nil {
Expand Down
121 changes: 113 additions & 8 deletions pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transformers

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -260,6 +261,14 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{
{
Uri: "uri_a",
},
{
Uri: "uri_b",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
Expand Down Expand Up @@ -312,12 +321,10 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
OccurredAt: occuredAtProto,
Logs: []*core.TaskLog{
{
Name: "some_log",
Uri: "some_uri",
Uri: "uri_b",
},
{
Name: "some_log2",
Uri: "some_uri2",
Uri: "uri_c",
},
},
CustomInfo: &customInfo,
Expand All @@ -338,12 +345,13 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
},
Logs: []*core.TaskLog{
{
Name: "some_log",
Uri: "some_uri",
Uri: "uri_a",
},
{
Name: "some_log2",
Uri: "some_uri2",
Uri: "uri_b",
},
{
Uri: "uri_c",
},
},
CustomInfo: &customInfo,
Expand Down Expand Up @@ -501,3 +509,100 @@ func TestFromTaskExecutionModels(t *testing.T) {
Closure: taskClosure,
}, taskExecutions[0]))
}

func TestMergeLogs(t *testing.T) {
type testCase struct {
existing []*core.TaskLog
latest []*core.TaskLog
expected []*core.TaskLog
name string
}

testCases := []testCase{
{
existing: []*core.TaskLog{
{
Uri: "uri_a",
Name: "name_a",
},
{
Uri: "uri_b",
Name: "name_b",
},
{
Uri: "uri_c",
Name: "name_c",
},
},
latest: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
{
Uri: "uri_d",
Name: "name_d",
},
},
expected: []*core.TaskLog{
{
Uri: "uri_a",
Name: "name_a",
},
{
Uri: "uri_b",
Name: "name_b",
},
{
Uri: "uri_c",
Name: "name_c",
},
{
Uri: "uri_d",
Name: "name_d",
},
},
name: "Merge unique logs",
},
{
latest: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
},
expected: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
},
name: "Empty existing logs",
},
{
existing: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
},
expected: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
},
name: "Empty latest logs",
},
{
name: "Nothing to do",
},
}
for _, mergeTestCase := range testCases {
actual := mergeLogs(mergeTestCase.existing, mergeTestCase.latest)
assert.Equal(t, len(mergeTestCase.expected), len(actual), fmt.Sprintf("%s failed", mergeTestCase.name))
for idx, expectedLog := range mergeTestCase.expected {
assert.True(t, proto.Equal(expectedLog, actual[idx]), fmt.Sprintf("%s failed", mergeTestCase.name))
}
}
}

0 comments on commit 91a82e1

Please sign in to comment.