From 60a3f4228616d80660d43c9f3fed706ba9f5016d Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 30 Jun 2023 14:47:46 -0700 Subject: [PATCH] Revert "Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order (#3747)" This reverts commit 9e77f6f6d612708d4d6707ba06aff196eefb477b. --- agent/api/task/task.go | 4 ++-- agent/engine/docker_task_engine.go | 11 +++-------- agent/engine/host_resource_manager.go | 5 ++--- agent/engine/task_manager.go | 18 +++--------------- 4 files changed, 10 insertions(+), 28 deletions(-) diff --git a/agent/api/task/task.go b/agent/api/task/task.go index 16131294f3..7ea629d5ee 100644 --- a/agent/api/task/task.go +++ b/agent/api/task/task.go @@ -3628,8 +3628,8 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource { "taskArn": task.Arn, "CPU": *resources["CPU"].IntegerValue, "MEMORY": *resources["MEMORY"].IntegerValue, - "PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue), - "PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue), + "PORTS_TCP": resources["PORTS_TCP"].StringSetValue, + "PORTS_UDP": resources["PORTS_UDP"].StringSetValue, "GPU": *resources["GPU"].IntegerValue, }) return resources diff --git a/agent/engine/docker_task_engine.go b/agent/engine/docker_task_engine.go index 9bb5c902f0..69ca5f6ab5 100644 --- a/agent/engine/docker_task_engine.go +++ b/agent/engine/docker_task_engine.go @@ -597,20 +597,15 @@ func (engine *DockerTaskEngine) synchronizeState() { } tasks := engine.state.AllTasks() - // For normal task progress, overseeTask 'consume's resources through waitForHostResources in host_resource_manager before progressing - // For agent restarts (state restore), we pre-consume resources for tasks that had progressed beyond waitForHostResources stage - - // so these tasks do not wait during 'waitForHostResources' call again - do not go through queuing again - // - // Call reconcileHostResources before - // - filterTasksToStartUnsafe which will reconcile container statuses for the duration the agent was stopped - // - starting managedTask's overseeTask goroutines - engine.reconcileHostResources() tasksToStart := engine.filterTasksToStartUnsafe(tasks) for _, task := range tasks { task.InitializeResources(engine.resourceFields) engine.saveTaskData(task) } + // Before starting managedTask goroutines, pre-allocate resources for tasks which + // which have progressed beyond resource check (waitingTaskQueue) stage + engine.reconcileHostResources() for _, task := range tasksToStart { engine.startTask(task) } diff --git a/agent/engine/host_resource_manager.go b/agent/engine/host_resource_manager.go index 9293af933e..98f502ab2a 100644 --- a/agent/engine/host_resource_manager.go +++ b/agent/engine/host_resource_manager.go @@ -23,7 +23,6 @@ import ( "github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" - "github.com/aws/aws-sdk-go/aws" ) const ( @@ -73,8 +72,8 @@ func (h *HostResourceManager) logResources(msg string, taskArn string) { "taskArn": taskArn, "CPU": *h.consumedResource[CPU].IntegerValue, "MEMORY": *h.consumedResource[MEMORY].IntegerValue, - "PORTS_TCP": aws.StringValueSlice(h.consumedResource[PORTSTCP].StringSetValue), - "PORTS_UDP": aws.StringValueSlice(h.consumedResource[PORTSUDP].StringSetValue), + "PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue, + "PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue, "GPU": *h.consumedResource[GPU].IntegerValue, }) } diff --git a/agent/engine/task_manager.go b/agent/engine/task_manager.go index beb559547d..45e2f2ab03 100644 --- a/agent/engine/task_manager.go +++ b/agent/engine/task_manager.go @@ -199,17 +199,12 @@ func (mtask *managedTask) overseeTask() { // `desiredstatus`es which are a construct of the engine used only here, // not present on the backend mtask.UpdateStatus() - - // Wait here until enough resources are available on host for the task to progress - // - Waits until host resource manager succesfully 'consume's task resources and returns - // - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately - // - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately - // (resources are later 'release'd on Stopped task emitTaskEvent call) - mtask.waitForHostResources() - // If this was a 'state restore', send all unsent statuses mtask.emitCurrentStatus() + // Wait for host resources required by this task to become available + mtask.waitForHostResources() + // Main infinite loop. This is where we receive messages and dispatch work. for { if mtask.shouldExit() { @@ -277,13 +272,6 @@ func (mtask *managedTask) emitCurrentStatus() { // the task. It will wait for event on this task's consumedHostResourceEvent // channel from monitorQueuedTasks routine to wake up func (mtask *managedTask) waitForHostResources() { - if mtask.GetKnownStatus().Terminal() { - // Task's known status is STOPPED. No need to wait in this case and proceed to cleanup - // This is relevant when agent restarts and a task has stopped - do not attempt - // to consume resources in host resource manager - return - } - if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) { // Internal tasks are started right away as their resources are not accounted for mtask.engine.enqueueTask(mtask)