Skip to content

Commit

Permalink
[manager/orchestrator/reaper] Clean out the task reaper dirty set at …
Browse files Browse the repository at this point in the history
…the end of tick()

Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir committed Jun 25, 2018
1 parent 5876480 commit 8da0f79
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 3 deletions.
23 changes: 21 additions & 2 deletions manager/orchestrator/taskreaper/task_reaper.go
Expand Up @@ -184,12 +184,23 @@ func (tr *TaskReaper) tick() {
}

// Check history of dirty tasks for cleanup.
// Note: Clean out the dirty set at the end of this tick iteration
// in all but one scenarios (documented below).
// When tick() finishes, the tasks in the slot were either cleaned up,
// or it was skipped because it didn't meet the criteria for cleaning.
// Either way, we can discard the dirty set because future events on
// that slot will cause the task to be readded to the dirty set
// at that point.
//
// The only case when we keep the slot dirty is when there are more
// than one running tasks present for a given slot.
// In that case, we need to keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more the tasks
// in that slot have stopped running.
tr.store.View(func(tx store.ReadTx) {
for dirty := range tr.dirty {
service := store.GetService(tx, dirty.ServiceID)
if service == nil {
// If the service can't be found, assume that it was deleted
// and remove the slot from the dirty list.
delete(tr.dirty, dirty)
continue
}
Expand All @@ -214,6 +225,7 @@ func (tr *TaskReaper) tick() {

// Negative value for TaskHistoryRetentionLimit is an indication to never clean up task history.
if taskHistory < 0 {
delete(tr.dirty, dirty)
continue
}

Expand Down Expand Up @@ -243,6 +255,7 @@ func (tr *TaskReaper) tick() {
}

if int64(len(historicTasks)) <= taskHistory {
delete(tr.dirty, dirty)
continue
}

Expand Down Expand Up @@ -273,6 +286,12 @@ func (tr *TaskReaper) tick() {
}
}

// The only case when we keep the slot dirty at the end of tick()
// is when there are more than one running tasks present
// for a given slot.
// In that case, we keep the slot dirty to allow it to be
// cleaned when tick() is called next and one or more of
// the tasks in that slot have stopped running.
if runningTasks <= 1 {
delete(tr.dirty, dirty)
}
Expand Down
207 changes: 206 additions & 1 deletion manager/orchestrator/taskreaper/task_reaper_test.go
Expand Up @@ -2,6 +2,7 @@ package taskreaper

import (
"context"
"github.com/docker/swarmkit/manager/orchestrator"

"testing"

Expand Down Expand Up @@ -805,7 +806,7 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
}

// Create a service with one replica specified before the orchestrator is
// started. This should result in two tasks when the orchestrator
// started. This should result in one tasks when the orchestrator
// starts up.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
Expand Down Expand Up @@ -871,3 +872,207 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, foundTasks, 1)
}

func setupTaskReaperDirty(tr *TaskReaper) {
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id1",
NodeID: "node1",
}] = struct{}{}
tr.dirty[orchestrator.SlotTuple{
Slot: 1,
ServiceID: "id2",
NodeID: "node1",
}] = struct{}{}
}

// TestTick unit-tests the task reaper tick function.
// 1. Test that the dirty set is cleaned up when the service can't be found.
// 2. Test that the dirty set is cleaned up when the number of total tasks
// is smaller than the retention limit.
// 3. Test that the dirty set and excess tasks in the store are cleaned up
// when there the number of total tasks is greater than the retention limit.
func TestTick(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

assert.NoError(t, s.Update(func(tx store.Tx) error {
store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
// set TaskHistoryRetentionLimit to a negative value, so
// that tasks are cleaned up right away.
TaskHistoryRetentionLimit: 1,
},
},
})
return nil
}))

// create the task reaper.
taskReaper := New(s)

// Test # 1
// Setup the dirty set with entries to
// verify that the dirty set it cleaned up
// when the service is not found.
setupTaskReaperDirty(taskReaper)
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Test # 2
// Verify that the dirty set it cleaned up
// when the history limit is set to zero.

// Create a service in the store for the following test cases.
service1 := &api.Service{
ID: "id1",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name1",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create another service in the store for the following test cases.
service2 := &api.Service{
ID: "id2",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "name2",
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 1,
},
},
Task: api.TaskSpec{
Restart: &api.RestartPolicy{
// Turn off restart to get an accurate count on tasks.
Condition: api.RestartOnNone,
Delay: gogotypes.DurationProto(0),
},
},
},
}

// Create a service.
err := s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateService(tx, service1))
assert.NoError(t, store.CreateService(tx, service2))
return nil
})
assert.NoError(t, err)

// Setup the dirty set with entries to
// verify that the dirty set it cleaned up
// when the history limit is set to zero.
setupTaskReaperDirty(taskReaper)
taskReaper.taskHistory = 0
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Test # 3
// Test that the tasks are cleanup when the total number of tasks
// is greater than the retention limit.

// Create tasks for both services in the store.
task1 := &api.Task{
ID: "id1task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "id1",
ServiceAnnotations: api.Annotations{
Name: "name1",
},
}

task2 := &api.Task{
ID: "id2task1",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "id2",
ServiceAnnotations: api.Annotations{
Name: "name2",
},
}

// Create Tasks.
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

// Set history to 1 to ensure that the tasks are not cleaned up yet.
// At the same time, we should be able to test that the dirty set was
// cleaned up at the end of tick().
taskReaper.taskHistory = 1
setupTaskReaperDirty(taskReaper)
// call tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))

// Now test that tick() function cleans up the old tasks from the store.

// Create new tasks in the store for the same slots to simulate service update.
task1.Status.State = api.TaskStateNew
task1.DesiredState = api.TaskStateRunning
task1.ID = "id1task2"
task2.Status.State = api.TaskStateNew
task2.DesiredState = api.TaskStateRunning
task2.ID = "id2task2"
err = s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateTask(tx, task1))
assert.NoError(t, store.CreateTask(tx, task2))
return nil
})
assert.NoError(t, err)

watch, cancel := state.Watch(s.WatchQueue() /*api.EventCreateTask{}, api.EventUpdateTask{}*/)
defer cancel()

// Setup the task reaper dirty set.
setupTaskReaperDirty(taskReaper)
// CVall tick directly and verify dirty set was cleaned up.
taskReaper.tick()
assert.Zero(t, len(taskReaper.dirty))
// Task reaper should delete the task previously marked for SHUTDOWN.
deletedTask1 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask1.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")

deletedTask2 := testutils.WatchTaskDelete(t, watch)
assert.Equal(t, api.TaskStateShutdown, deletedTask2.Status.State)
assert.Equal(t, api.TaskStateShutdown, deletedTask2.DesiredState)
assert.True(t, deletedTask1.ServiceAnnotations.Name == "name1" ||
deletedTask1.ServiceAnnotations.Name == "name2")
}

0 comments on commit 8da0f79

Please sign in to comment.