Skip to content

Commit

Permalink
Finish Allocator
Browse files Browse the repository at this point in the history
Finishes the top-level allocator object, which passes all of the old
allocator tests (when accounting for some changes in the unexported
behavior of the old allocator that was causing tests to fail).

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed May 3, 2018
1 parent ad5987a commit d4d8c2b
Show file tree
Hide file tree
Showing 25 changed files with 3,298 additions and 75 deletions.
129 changes: 115 additions & 14 deletions manager/allocator/allocator_new.go
Expand Up @@ -138,7 +138,7 @@ func (a *NewAllocator) Run(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
// cancel the event stream and wake all of the waiting goroutines
log.G(ctx).Info("context done, canceling the event stream")
cancel()
}
}()
Expand Down Expand Up @@ -166,15 +166,18 @@ func (a *NewAllocator) Run(ctx context.Context) error {

a.processPendingAllocations(ctx)
var wg sync.WaitGroup
wg.Add(2)

wg.Add(1)
log.G(ctx).Info("starting event watch loop")
// this goroutine handles incoming store events. all we need from the
// events is the ID of what has been updated. by the time we service the
// allocation, the object may have changed, so we don't save any other
// information. we'll get to it later.
go func() {
defer wg.Done()
// TODO(dperny): there is an outstanding issue with a deadlock that can
// result from closing the store at the same time the callback watch is
// canceled. This can cause the tests to deadlock
for {
select {
case <-ctx.Done():
Expand All @@ -200,10 +203,44 @@ func (a *NewAllocator) Run(ctx context.Context) error {
// is already gone, deal with it
if ev.Network != nil {
a.pendingMu.Lock()
// when a network is deallocated, deallocate all nodes
// TODO(dperny): don't access the store in this event
// loop
a.store.Update(func(tx store.Tx) error {
nodes, err := store.FindNodes(tx, store.All)
if err != nil {
// TODO(dperny): better error handling
return err
}
for _, node := range nodes {
nwids := map[string]struct{}{}
for _, attachment := range node.Attachments {
if attachment.Network.ID != ev.Network.ID {
nwids[attachment.Network.ID] = struct{}{}
}
}
if err := a.network.AllocateNode(node, nwids); err != nil {
if !errors.IsErrAlreadyAllocated(err) {
// TODO(dperny): better error handling
return err
}
}
if err := store.UpdateNode(tx, node); err != nil {
return err
}
}
return nil
})
a.network.DeallocateNetwork(ev.Network)
delete(a.pendingNetworks, ev.Network.ID)
a.pendingMu.Unlock()
}
// case api.EventCreateService, api.EventUpdateService:
// the case for create and update services is not needed.
// we will lazy-allocate services. the reason for this is
// to guarantee that the latest version of the service is
// always allocated, and the task is always using the
// latest version of the service for allocation.
case api.EventDeleteService:
if ev.Service != nil {
a.pendingMu.Lock()
Expand All @@ -218,9 +255,14 @@ func (a *NewAllocator) Run(ctx context.Context) error {
t = ev.(api.EventUpdateTask).Task
}
if t != nil {
// updating a task may mean the task has entered a
// terminal state. if it has, we will free its network
// resources just as if we had deleted it.
a.pendingMu.Lock()
if t.Status.State >= api.TaskStateCompleted {
a.network.DeallocateTask(t)
// if the task was pending before now, remove it
// because it no loger is
delete(a.pendingTasks, t.ID)
} else {
a.pendingTasks[t.ID] = struct{}{}
Expand All @@ -240,18 +282,21 @@ func (a *NewAllocator) Run(ctx context.Context) error {
a.pendingNodes[ev.Node.ID] = struct{}{}
a.pendingMu.Unlock()
}
// case api.EventUpdateNode:
// we don't need the update node case because nothing about
// a node update will require reallocation.
case api.EventDeleteNode:
if ev.Node != nil {
a.pendingMu.Lock()
a.network.DeallocateNode(ev.Node)
delete(a.pendingNodes, ev.Node.ID)
a.pendingMu.Unlock()
}
}
}
}
}()

wg.Add(1)
log.G(ctx).Info("starting batch processing loop")
// allocations every 500 milliseconds
batchTimer := time.NewTimer(maxBatchInterval)
Expand All @@ -276,14 +321,17 @@ func (a *NewAllocator) Run(ctx context.Context) error {

log.G(ctx).Info("allocator is up!")
wg.Wait()
log.G(ctx).Info("allocator is finished")
defer log.G(ctx).Info("all defers exited")

return nil
}

func (a *NewAllocator) processPendingAllocations(ctx context.Context) {
ctx = log.WithField(ctx, "method", "(*NewAllocator).processPendingAllocations")
a.pendingMu.Lock()
defer a.pendingMu.Unlock()
a.store.Batch(func(batch *store.Batch) error {
if err := a.store.Batch(func(batch *store.Batch) error {
if len(a.pendingNetworks) > 0 {
log.G(ctx).Infof("allocating %v networks", len(a.pendingNetworks))
}
Expand Down Expand Up @@ -316,11 +364,29 @@ func (a *NewAllocator) processPendingAllocations(ctx context.Context) {
}
}
// log.G(ctx).Debug("network allocation succeeded")
return store.UpdateNetwork(tx, network)
if err := store.UpdateNetwork(tx, network); err != nil {
// TODO(dperny) better error handling here?
return err
}
// if this is an overlay network, reallocate all of the nodes
// (to attach them to this overlay)
if network.DriverState != nil && network.DriverState.Name == "overlay" {
log.G(ctx).Info("added an overlay network, reallocating all nodes")
nodes, err := store.FindNodes(tx, store.All)
if err != nil {
// TODO(dperny): again, can we handle errors better?
return err
}
for _, node := range nodes {
a.pendingNodes[node.ID] = struct{}{}
}
}
return nil
}); err != nil {
log.G(ctx).WithError(err).Error("batch update of network allocation failed")
}
}
log.G(ctx).Info("finished allocating networks")

if len(a.pendingTasks) > 0 {
log.G(ctx).Infof("allocating %v tasks", len(a.pendingTasks))
Expand All @@ -343,30 +409,65 @@ func (a *NewAllocator) processPendingAllocations(ctx context.Context) {
log.G(ctx).WithError(err).Error("error in batch update")
}
}
log.G(ctx).Info("finished allocating tasks")

// TODO(dperny) skipping nodes for now, except for overlay.
if len(a.pendingNodes) > 0 {
log.G(ctx).Infof("allocating %v nodes", len(a.pendingNodes))
}
// TODO(dperny): this is a really sloppy way of logging this once.
var logged bool
for nodeid := range a.pendingNodes {
ctx := log.WithField(ctx, "node.id", nodeid)
log.G(ctx).Debug("allocating node")
batch.Update(func(tx store.Tx) error {
if err := batch.Update(func(tx store.Tx) error {
networks, err := store.FindNetworks(tx, store.All)
if err != nil {
// TODO(dperny): better error handling
return err
}
nwids := map[string]struct{}{}
// add all overlay networks to the node
for _, network := range networks {
// using the driver state instead of the spec here means
// we'll only request attachments for fully-allocated
// networks
if network.DriverState != nil && network.DriverState.Name == "overlay" {
nwids[network.ID] = struct{}{}
}
}

if !logged {
log.G(ctx).Infof("allocating %v networks for nodes", len(nwids))
logged = true
}

node := store.GetNode(tx, nodeid)
// TODO(dperny): passing nil will cause us to allocate just the
// ingress network. can't ship like this.
err := a.network.AllocateNode(node, nil)
err = a.network.AllocateNode(node, nwids)
// if there is any error case other than already allocated, log
// it
if err != nil && !errors.IsErrAlreadyAllocated(err) {
log.G(ctx).WithError(err).Error("error allocating node")
}
if !errors.IsErrRetryable(err) || errors.IsErrAlreadyAllocated(err) {
delete(a.pendingNodes, nodeid)
}
// TODO(dperny): this is too verbose logging
if errors.IsErrAlreadyAllocated(err) {
log.G(ctx).Info("node is already fully allocated")
}
if err == nil {
store.UpdateNode(tx, node)
return store.UpdateNode(tx, node)
}
return nil
})
}); err != nil {
log.G(ctx).WithError(err).Error("error in batch update")
}
}
log.G(ctx).Info("finished allocating nodes")
log.G(ctx).Info("finished all allocations")
return nil
})
}); err != nil {
log.G(ctx).WithError(err).Error("error processing pending allocations batch")
}
}

func (a *NewAllocator) allocateTask(tx store.Tx, taskid string) error {
Expand Down

0 comments on commit d4d8c2b

Please sign in to comment.