Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocator: Less aggressive retry #2021

Merged
merged 1 commit into from
Mar 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
// set artificially low retry interval for testing
retryInterval = 5 * time.Millisecond
}

func TestAllocator(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
Expand Down
69 changes: 53 additions & 16 deletions manager/allocator/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const (
allocatedStatusMessage = "pending task scheduling"
)

var errNoChanges = errors.New("task unchanged")
var (
errNoChanges = errors.New("task unchanged")

retryInterval = 5 * time.Minute
)

func newIngressNetwork() *api.Network {
return &api.Network{
Expand Down Expand Up @@ -57,19 +61,28 @@ type networkContext struct {
// the actual network allocation.
nwkAllocator *networkallocator.NetworkAllocator

// A table of unallocated tasks which will be revisited if any thing
// A set of tasks which are ready to be allocated as a batch. This is
// distinct from "unallocatedTasks" which are tasks that failed to
// allocate on the first try, being held for a future retry.
pendingTasks map[string]*api.Task

// A set of unallocated tasks which will be revisited if any thing
// changes in system state that might help task allocation.
unallocatedTasks map[string]*api.Task

// A table of unallocated services which will be revisited if
// A set of unallocated services which will be revisited if
// any thing changes in system state that might help service
// allocation.
unallocatedServices map[string]*api.Service

// A table of unallocated networks which will be revisited if
// A set of unallocated networks which will be revisited if
// any thing changes in system state that might help network
// allocation.
unallocatedNetworks map[string]*api.Network

// lastRetry is the last timestamp when unallocated
// tasks/services/networks were retried.
lastRetry time.Time
}

func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {
Expand All @@ -80,10 +93,12 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) {

nc := &networkContext{
nwkAllocator: na,
pendingTasks: make(map[string]*api.Task),
unallocatedTasks: make(map[string]*api.Task),
unallocatedServices: make(map[string]*api.Service),
unallocatedNetworks: make(map[string]*api.Network),
ingressNetwork: newIngressNetwork(),
lastRetry: time.Now(),
}
a.netCtx = nc
defer func() {
Expand Down Expand Up @@ -403,10 +418,20 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) {
case state.EventCreateTask, state.EventUpdateTask, state.EventDeleteTask:
a.doTaskAlloc(ctx, ev)
case state.EventCommit:
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procUnallocatedTasksNetwork(ctx)
return
a.procTasksNetwork(ctx, false)

if time.Since(nc.lastRetry) > retryInterval {
a.procUnallocatedNetworks(ctx)
a.procUnallocatedServices(ctx)
a.procTasksNetwork(ctx, true)
nc.lastRetry = time.Now()
}

// Any left over tasks are moved to the unallocated set
for _, t := range nc.pendingTasks {
nc.unallocatedTasks[t.ID] = t
}
nc.pendingTasks = make(map[string]*api.Task)
}
}

Expand Down Expand Up @@ -579,14 +604,16 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
}
}

// Cleanup any task references that might exist in unallocatedTasks
// Cleanup any task references that might exist
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}

// If we are already in allocated state, there is
// absolutely nothing else to do.
if t.Status.State >= api.TaskStatePending {
delete(nc.pendingTasks, t.ID)
delete(nc.unallocatedTasks, t.ID)
return
}
Expand Down Expand Up @@ -616,7 +643,7 @@ func (a *Allocator) doTaskAlloc(ctx context.Context, ev events.Event) {
// based on service spec.
a.taskCreateNetworkAttachments(t, s)

nc.unallocatedTasks[t.ID] = t
nc.pendingTasks[t.ID] = t
}

func (a *Allocator) allocateNode(ctx context.Context, node *api.Node) error {
Expand Down Expand Up @@ -948,15 +975,25 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) {
}
}

func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
func (a *Allocator) procTasksNetwork(ctx context.Context, onRetry bool) {
nc := a.netCtx
allocatedTasks := make([]*api.Task, 0, len(nc.unallocatedTasks))
quiet := false
toAllocate := nc.pendingTasks
if onRetry {
toAllocate = nc.unallocatedTasks
quiet = true
}
allocatedTasks := make([]*api.Task, 0, len(toAllocate))

for _, t := range nc.unallocatedTasks {
for _, t := range toAllocate {
if err := a.allocateTask(ctx, t); err == nil {
allocatedTasks = append(allocatedTasks, t)
} else if err != errNoChanges {
log.G(ctx).WithError(err).Error("task allocation failure")
if quiet {
log.G(ctx).WithError(err).Debug("task allocation failure")
} else {
log.G(ctx).WithError(err).Error("task allocation failure")
}
}
}

Expand All @@ -978,11 +1015,11 @@ func (a *Allocator) procUnallocatedTasksNetwork(ctx context.Context) {
})

if err != nil {
log.G(ctx).WithError(err).Error("failed a store batch operation while processing unallocated tasks")
log.G(ctx).WithError(err).Error("failed a store batch operation while processing tasks")
}

for _, t := range allocatedTasks[:committed] {
delete(nc.unallocatedTasks, t.ID)
delete(toAllocate, t.ID)
}
}

Expand Down