Skip to content

Commit

Permalink
Fix task reaper batching
Browse files Browse the repository at this point in the history
The batching logic of the task reaper was previously race-y because of
its use of timer.Reset. This fixes the logic to guarantee it's not
race-y.

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Jun 25, 2018
1 parent 5876480 commit 2e0c1cc
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 1 deletion.
65 changes: 64 additions & 1 deletion manager/orchestrator/taskreaper/task_reaper.go
Expand Up @@ -40,6 +40,12 @@ type TaskReaper struct {
cleanup []string
stopChan chan struct{}
doneChan chan struct{}

// tickSignal is a channel that, if non-nil and available, will be written
// to to signal that a tick has occurred. its sole purpose is for testing
// code, to verify that take cleanup attempts are happening when they
// should be.
tickSignal chan struct{}
}

// New creates a new TaskReaper.
Expand Down Expand Up @@ -115,7 +121,34 @@ func (tr *TaskReaper) Run(ctx context.Context) {

// Clean up when we hit TaskHistoryRetentionLimit or when the timer expires,
// whichever happens first.
//
// Specifically, the way this should work:
// - Create a timer and immediately stop it. We don't want to fire the
// cleanup routine yet, because we just did a cleanup as part of the
// initialization above.
// - Launch into an event loop
// - When we receive an event, handle the event as needed
// - After receiving the event:
// - If minimum batch size (maxDirty) is exceeded with dirty + cleanup,
// then immediately launch into the cleanup routine
// - Otherwise, if the timer is stopped, start it (reset).
// - If the timer expires and the timer channel is signaled, then Stop the
// timer (so that it will be ready to be started again as needed), and
// execute the cleanup routine (tick)
timer := time.NewTimer(reaperBatchingInterval)
timer.Stop()

// If stop is somehow called AFTER the timer has expired, there will be a
// value in the timer.C channel. If there is such a value, we should drain
// it out. This select statement allows us to drain that value if it's
// present, or continue straight through otherwise.
select {
case <-timer.C:
default:
}

// keep track with a boolean of whether the timer is currently stopped
isTimerStopped := true

// Watch for:
// 1. EventCreateTask for cleaning slots, which is the best time to cleanup that node/slot.
Expand Down Expand Up @@ -153,23 +186,53 @@ func (tr *TaskReaper) Run(ctx context.Context) {
}

if len(tr.dirty)+len(tr.cleanup) > maxDirty {
// stop the timer, so we don't fire it. if we get another event
// after we do this cleaning, we will reset the timer then
timer.Stop()
// if the timer had fired, drain out the value.
select {
case <-timer.C:
default:
}
isTimerStopped = true
tr.tick()
} else {
timer.Reset(reaperBatchingInterval)
if isTimerStopped {
timer.Reset(reaperBatchingInterval)
isTimerStopped = false
}
}
case <-timer.C:
// we can safely ignore draining off of the timer channel, because
// we already know that the timer is stopped.
timer.Stop()
isTimerStopped = true
tr.tick()
case <-tr.stopChan:
// even though this doesn't really matter in this context, it's
// good hygiene to drain the value.
timer.Stop()
select {
case <-timer.C:
default:
}
return
}
}
}

// tick performs task history cleanup.
func (tr *TaskReaper) tick() {
// this signals that a tick has occurred. it exists solely for testing.
if tr.tickSignal != nil {
// try writing to this channel, but if it's full, fall straight through
// and ignore it.
select {
case tr.tickSignal <- struct{}{}:
default:
}
}

if len(tr.dirty) == 0 && len(tr.cleanup) == 0 {
return
}
Expand Down
194 changes: 194 additions & 0 deletions manager/orchestrator/taskreaper/task_reaper_test.go
Expand Up @@ -2,6 +2,8 @@ package taskreaper

import (
"context"
"fmt"
"time"

"testing"

Expand Down Expand Up @@ -871,3 +873,195 @@ func TestServiceRemoveUnassignedTasks(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, foundTasks, 1)
}

// TestTaskReaperBatching tests that the batching logic for the task reaper
// runs correctly.
func TestTaskReaperBatching(t *testing.T) {
// create a canned context and store to use with this task reaper
ctx := context.Background()
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

var (
task1, task2, task3 *api.Task
tasks []*api.Task
)

// set up all of the test fixtures
assert.NoError(t, s.Update(func(tx store.Tx) error {
// we need a cluster object, because we need to set the retention limit
// to a low value
assert.NoError(t, store.CreateCluster(tx, &api.Cluster{
ID: identity.NewID(),
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
TaskHistoryRetentionLimit: 1,
},
},
}))

task1 = &api.Task{
ID: "foo",
ServiceID: "bar",
Slot: 0,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
// we need to create all of the tasks used in this test, because we'll
// be using task update events to trigger reaper behavior.
assert.NoError(t, store.CreateTask(tx, task1))

task2 = &api.Task{
ID: "foo2",
ServiceID: "bar",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
assert.NoError(t, store.CreateTask(tx, task2))

tasks = make([]*api.Task, maxDirty+1)
for i := 0; i < maxDirty+1; i++ {
tasks[i] = &api.Task{
ID: fmt.Sprintf("baz%v", i),
ServiceID: "bar",
// every task in a different slot, so they don't get cleaned up
// based on exceeding the retention limit
Slot: uint64(i),
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
if err := store.CreateTask(tx, tasks[i]); err != nil {
return err
}
}

task3 = &api.Task{
ID: "foo3",
ServiceID: "bar",
Slot: 2,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
}
assert.NoError(t, store.CreateTask(tx, task3))
return nil
}))

// now create the task reaper
taskReaper := New(s)
taskReaper.tickSignal = make(chan struct{}, 1)
defer taskReaper.Stop()
go taskReaper.Run(ctx)

// None of the tasks we've created are eligiable for deletion. We should
// see no task delete events. Wait for a tick signal, or 500ms to pass, to
// verify that no tick will occur.
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper ticked when it should not have")
case <-time.After(reaperBatchingInterval * 2):
// ok, looks good, moving on
}

// update task1 to die
assert.NoError(t, s.Update(func(tx store.Tx) error {
task1.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task1)
}))

// the task should be added to the cleanup map and a tick should occur
// shortly. give it an extra 50ms for overhead
select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked but did not")
}

// now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now make sure we'll tick again if we update another task to die
assert.NoError(t, s.Update(func(tx store.Tx) error {
task2.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task2)
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked by now but did not")
}

// again, now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now create a shitload of tasks. this should tick immediately after, no
// waiting. we should easily within the batching interval be able to
// process all of these events, and should expect 1 tick immediately after
// and no more
assert.NoError(t, s.Update(func(tx store.Tx) error {
for _, task := range tasks {
task.DesiredState = api.TaskStateRemove
assert.NoError(t, store.UpdateTask(tx, task))
}
return nil
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval):
// tight bound on the how long it should take to tick. we should tick
// before the reaper batching interval. this should only POSSIBLY fail
// on a really slow system, where processing the 1000+ incoming events
// takes longer than the reaperBatchingInterval. if this test flakes
// here, that's probably why.
t.Fatalf("we should have immediately ticked already, but did not")
}

// again again, wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}

// now before we wrap up, make sure the task reaper still works off the
// timer
assert.NoError(t, s.Update(func(tx store.Tx) error {
task3.DesiredState = api.TaskStateRemove
return store.UpdateTask(tx, task3)
}))

select {
case <-taskReaper.tickSignal:
case <-time.After(reaperBatchingInterval + (50 * time.Millisecond)):
t.Fatalf("the taskreaper should have ticked by now but did not")
}

// again, now wait and make sure the task reaper does not tick again
select {
case <-taskReaper.tickSignal:
t.Fatalf("the taskreaper should not have ticked but did")
case <-time.After(reaperBatchingInterval * 2):
}
}

0 comments on commit 2e0c1cc

Please sign in to comment.