Skip to content

Commit

Permalink
Continue fixing allocator
Browse files Browse the repository at this point in the history
Continues improving the new allocator, after having removed the Networks
field from the Service object

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Jul 3, 2018
1 parent 99d1021 commit 4a526e7
Show file tree
Hide file tree
Showing 12 changed files with 531 additions and 111 deletions.
40 changes: 33 additions & 7 deletions manager/allocator/allocator_new.go
Expand Up @@ -458,24 +458,34 @@ func (a *NewAllocator) handleEvent(ctx context.Context, event events.Event) {
}
case api.EventCreateTask, api.EventUpdateTask:
var t *api.Task
if e, ok := ev.(api.EventCreateTask); ok {
if e, ok := ev.(api.EventUpdateTask); ok {
// before we do any processing, check the old task. if the previous
// state was a terminal state, then the task must necessarily have
// already been deallocated.
if e.OldTask != nil && !isTaskLive(e.OldTask) {
return
}
t = e.Task
} else {
t = ev.(api.EventUpdateTask).Task
t = ev.(api.EventCreateTask).Task
}

if t == nil {
// if for some reason there is no task, nothing to do
return
}

// updating a task may mean the task has entered a terminal state. if
// it has, we will free its network resources just as if we had deleted
// it.
if t.Status.State >= api.TaskStateCompleted {
if !isTaskLive(t) {
// if the task was already in a terminal state, we should ignore
// this update, not try to deallocate it again
if _, ok := a.deletedObjects[t.ID]; ok {
delete(a.deletedObjects, t.ID)
return
}
log.G(ctx).WithField("task.id", t.ID).Debug("deallocating task")
allocDone := metrics.StartTimer(allocatorActions.WithValues("task", "deallocate"))
err := a.network.DeallocateTask(t)
allocDone()
Expand All @@ -492,7 +502,7 @@ func (a *NewAllocator) handleEvent(ctx context.Context, event events.Event) {
if ev.Task == nil {
return
}
if ev.Task.Status.State >= api.TaskStateCompleted {
if !isTaskLive(ev.Task) {
// if the task is being deleted from a terminal state, we will have
// deallocated it when it moved into that terminal state, and do
// not need to do so now.
Expand Down Expand Up @@ -975,9 +985,7 @@ func (a *NewAllocator) processPendingAllocations(ctx context.Context) {
for _, task := range allocatedTasks {
if err := batch.Update(func(tx store.Tx) error {
currentTask := store.GetTask(tx, task.ID)
if currentTask == nil ||
currentTask.Status.State != api.TaskStateNew ||
currentTask.DesiredState != api.TaskStateRunning {
if !isTaskLive(currentTask) {
log.G(ctx).WithField("task.id", task.ID).Debug("task terminated or removed after allocation but before commit")
a.network.DeallocateTask(task)
a.deletedObjects[task.ID] = struct{}{}
Expand Down Expand Up @@ -1005,3 +1013,21 @@ func (a *NewAllocator) processPendingAllocations(ctx context.Context) {
}
}
}

// isTaskLive returns true if the task is non-nil and in a non-terminal state.
//
// this function, though simple, has been factored out to avoid any errors that
// may arise from slightly different checks for liveness (for example,
// task.Status.State > api.TaskStateRunning vs task.Status.State >= api.TaskStateCompleted
func isTaskLive(t *api.Task) bool {
// a nil task is not live, of course
if t == nil {
return false
}
// a task past the RUNNING state is not live.
if t.Status.State > api.TaskStateRunning {
return false
}
// otherwise, it's live
return true
}
175 changes: 117 additions & 58 deletions manager/allocator/allocator_new_integration_test.go
Expand Up @@ -3,26 +3,21 @@ package allocator
// TODO(dperny): rename this file to allocator_test.go

import (
// testing imports
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/types"

// standard libraries
"context"
"fmt"
"reflect"
"time"

// the packages under in this integration test
// imported for PortsMostlyEqual
"github.com/docker/swarmkit/manager/allocator/network/port"

// external libraries
"github.com/docker/libnetwork/ipamapi"
"github.com/sirupsen/logrus"

// our libraries
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
Expand All @@ -38,6 +33,16 @@ const (
ingressID = "ingressNetID"
)

func init() {
// set some defaults specific to this test
// because batching is done every 500 ms, a polling interval shorter than
// that is useless
SetDefaultEventuallyPollingInterval(500 * time.Millisecond)
// allocation should be pretty quick, so use a 3 second timeout.
SetDefaultEventuallyTimeout(3 * time.Second)

}

// This suite is an integration test of the allocator, which uses real, live
// components. It should be a definitive test of the allocator's public API. If
// these tests pass, then any changes to the allocator should not require
Expand All @@ -54,6 +59,39 @@ var _ = Describe("allocator.NewAllocator", func() {
allocatorExitResult chan error
)

// define some quick closures for getting a network, service, or task
// these return a function to be passed to Eventually. we define them here
// because the dataStore object only exists in this scope
getNetwork := func(id string) func() *api.Network {
return func() *api.Network {
var network *api.Network
dataStore.View(func(tx store.ReadTx) {
network = store.GetNetwork(tx, id)
})
return network
}
}

getService := func(id string) func() *api.Service {
return func() *api.Service {
var service *api.Service
dataStore.View(func(tx store.ReadTx) {
service = store.GetService(tx, id)
})
return service
}
}

getTask := func(id string) func() *api.Task {
return func() *api.Task {
var task *api.Task
dataStore.View(func(tx store.ReadTx) {
task = store.GetTask(tx, id)
})
return task
}
}

// Before we start the tests, we need to create a store and an allocator.
// We won't start the allocator until just before the tests, so that the
// subspecs can add data to it before initialization
Expand Down Expand Up @@ -195,45 +233,11 @@ var _ = Describe("allocator.NewAllocator", func() {
JustBeforeEach(func() {
// there are a lot of Eventually calls here. these calls poll the
// enclosed function and check the matcher against their result
Eventually(func() *api.Network {
var nw *api.Network
dataStore.View(func(tx store.ReadTx) {
nw = store.GetNetwork(tx, nw1)
})
return nw
}, 3*time.Second, 500*time.Millisecond).Should(BeAllocated())

Eventually(func() *api.Network {
var nw *api.Network
dataStore.View(func(tx store.ReadTx) {
nw = store.GetNetwork(tx, nw2)
})
return nw
}, 3*time.Second, 500*time.Millisecond).Should(BeAllocated())

Eventually(func() *api.Network {
var nw *api.Network
dataStore.View(func(tx store.ReadTx) {
nw = store.GetNetwork(tx, nw3)
})
return nw
}, 3*time.Second, 500*time.Millisecond).Should(BeAllocated())

Eventually(func() *api.Service {
var service *api.Service
dataStore.View(func(tx store.ReadTx) {
service = store.GetService(tx, serviceID)
})
return service
}, 3*time.Second, 500*time.Millisecond).Should(BeAllocated())

Eventually(func() *api.Task {
var task *api.Task
dataStore.View(func(tx store.ReadTx) {
task = store.GetTask(tx, taskID)
})
return task
}, 3*time.Second, 500*time.Millisecond).Should(BeAllocated())
Eventually(getNetwork(nw1)).Should(BeAllocated())
Eventually(getNetwork(nw2)).Should(BeAllocated())
Eventually(getNetwork(nw3)).Should(BeAllocated())
Eventually(getService(serviceID)).Should(BeAllocated())
Eventually(getTask(taskID)).Should(BeAllocated())
})

It("should successfully update the service", func() {
Expand Down Expand Up @@ -278,21 +282,76 @@ var _ = Describe("allocator.NewAllocator", func() {
})
Expect(err).ToNot(HaveOccurred())

Eventually(func() *api.Task {
var t *api.Task
dataStore.View(func(tx store.ReadTx) {
t = store.GetTask(tx, newTaskID)
})
return t
}).Should(BeAllocated())
Eventually(getTask(newTaskID)).Should(BeAllocated())
Eventually(getService(serviceID)).Should(BeAllocated())
})
})

Eventually(func() *api.Service {
var s *api.Service
dataStore.View(func(tx store.ReadTx) {
s = store.GetService(tx, serviceID)
})
return s
}).Should(BeAllocated())
// TODO(dperny): how do we determine that an object was correctly
// _deallocated_? deallocation is an action that is not exposed on the
// public interface of the allocator
PDescribe("Moving a task to a terminal state", func() {
var (
serviceID = "termainalService"
taskID = "terminalTask"
)
BeforeEach(func() {
// create a service
service := &api.Service{
ID: serviceID,
Spec: api.ServiceSpec{
Task: api.TaskSpec{},
// if we just need to attach to some arbitrary network in
// a test, adding a port for ingress does the trick
Endpoint: &api.EndpointSpec{
Ports: []*api.PortConfig{
{TargetPort: 80, PublishedPort: 80},
},
},
},
}

// create a task.
task := &api.Task{
ID: taskID,
ServiceID: serviceID,
Spec: service.Spec.Task,
DesiredState: api.TaskStateRunning,
}

// now add them in the store
err := dataStore.Update(func(tx store.Tx) error {
err := store.CreateService(tx, service)
Expect(err).ToNot(HaveOccurred())

err = store.CreateTask(tx, task)
Expect(err).ToNot(HaveOccurred())
return nil
})
Expect(err).ToNot(HaveOccurred())
})

JustBeforeEach(func() {
// Make sure before we start the test that the service and task are
// allocated
Eventually(getService(serviceID)).Should(BeAllocated())
Eventually(getTask(taskID)).Should(BeAllocated())
})

It("should deallocate a task that moves to the terminal state", func() {
// TODO(dperny): how do we verify that a task has been or has not
// been deallocated...?
dataStore.Update(func(tx store.Tx) error {
// get the task and move it into a terminal state
task := store.GetTask(tx, taskID)
task.Status.State = api.TaskStateFailed
err := store.UpdateTask(tx, task)
Expect(err).ToNot(HaveOccurred())
return nil
})
})

It("should not deallocate a task that is updated when its in a terminal state", func() {
})
})
})
Expand Down

0 comments on commit 4a526e7

Please sign in to comment.