Skip to content

Commit

Permalink
Change reconcile/container update order on init and waitForHostResour…
Browse files Browse the repository at this point in the history
…ces/emitCurrentStatus order (aws#3747)
  • Loading branch information
prateekchaudhry committed Jun 22, 2023
1 parent 2d30365 commit 9e77f6f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
4 changes: 2 additions & 2 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3628,8 +3628,8 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource {
"taskArn": task.Arn,
"CPU": *resources["CPU"].IntegerValue,
"MEMORY": *resources["MEMORY"].IntegerValue,
"PORTS_TCP": resources["PORTS_TCP"].StringSetValue,
"PORTS_UDP": resources["PORTS_UDP"].StringSetValue,
"PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue),
"GPU": *resources["GPU"].IntegerValue,
})
return resources
Expand Down
11 changes: 8 additions & 3 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,20 @@ func (engine *DockerTaskEngine) synchronizeState() {
}

tasks := engine.state.AllTasks()
// For normal task progress, overseeTask 'consume's resources through waitForHostResources in host_resource_manager before progressing
// For agent restarts (state restore), we pre-consume resources for tasks that had progressed beyond waitForHostResources stage -
// so these tasks do not wait during 'waitForHostResources' call again - do not go through queuing again
//
// Call reconcileHostResources before
// - filterTasksToStartUnsafe which will reconcile container statuses for the duration the agent was stopped
// - starting managedTask's overseeTask goroutines
engine.reconcileHostResources()
tasksToStart := engine.filterTasksToStartUnsafe(tasks)
for _, task := range tasks {
task.InitializeResources(engine.resourceFields)
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for tasks which
// which have progressed beyond resource check (waitingTaskQueue) stage
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
}
Expand Down
5 changes: 3 additions & 2 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/aws-sdk-go/aws"
)

const (
Expand Down Expand Up @@ -72,8 +73,8 @@ func (h *HostResourceManager) logResources(msg string, taskArn string) {
"taskArn": taskArn,
"CPU": *h.consumedResource[CPU].IntegerValue,
"MEMORY": *h.consumedResource[MEMORY].IntegerValue,
"PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue,
"PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue,
"PORTS_TCP": aws.StringValueSlice(h.consumedResource[PORTSTCP].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(h.consumedResource[PORTSUDP].StringSetValue),
"GPU": *h.consumedResource[GPU].IntegerValue,
})
}
Expand Down
18 changes: 15 additions & 3 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,17 @@ func (mtask *managedTask) overseeTask() {
// `desiredstatus`es which are a construct of the engine used only here,
// not present on the backend
mtask.UpdateStatus()
// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Wait for host resources required by this task to become available
// Wait here until enough resources are available on host for the task to progress
// - Waits until host resource manager succesfully 'consume's task resources and returns
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
// (resources are later 'release'd on Stopped task emitTaskEvent call)
mtask.waitForHostResources()

// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Main infinite loop. This is where we receive messages and dispatch work.
for {
if mtask.shouldExit() {
Expand Down Expand Up @@ -272,6 +277,13 @@ func (mtask *managedTask) emitCurrentStatus() {
// the task. It will wait for event on this task's consumedHostResourceEvent
// channel from monitorQueuedTasks routine to wake up
func (mtask *managedTask) waitForHostResources() {
if mtask.GetKnownStatus().Terminal() {
// Task's known status is STOPPED. No need to wait in this case and proceed to cleanup
// This is relevant when agent restarts and a task has stopped - do not attempt
// to consume resources in host resource manager
return
}

if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) {
// Internal tasks are started right away as their resources are not accounted for
mtask.engine.enqueueTask(mtask)
Expand Down

0 comments on commit 9e77f6f

Please sign in to comment.