Skip to content

Commit

Permalink
Made various improvements to the AutostartedAgent._ensure_agents meth…
Browse files Browse the repository at this point in the history
…od (Issue #7612, PR #7612)

Fixes bug in autostarted agent manager that occasionally causes agent's sessions to time out, dropping calls, resulting in a "stuck" orchestrator until a redeploy is triggered.

~~The bug has been there forever, but the conditions for it to trigger happen to be set up by #7278.~~

~~The core of the bug is the following. When we need a certain agent to be up, we call `AutostartedAgentManager._ensure_agents`. This makes sure an agent process is running for these agents and waits for them to be up. However, instead of starting a process to track the autostarted agent map and to trust that it would keep up with changes to it, we instead start a process with an explicit list of agent endpoints.~~ If a new call comes in to `_ensure_agents`, and the agent is not yet up, we would kill the current process and start a new one. In killing the process, we would not expire its sessions, letting them time out on the 30s heartbeat timeout, losing any calls made to it in that window.

EDIT: Scratched some of the above: I wrote a test case for the first bullet point below, which I thought to be the root cause of the reason for killing the old agent process. However, turns out the test also succeeds on master. The agent's `on_reconnect` actually updates the process' agent map. So, I think my root cause analysis may have been wrong (at least less likely), but the second and third bullet points should fix the issue anyway (doesn't matter exactly *how* we got in the situation where only part of the agent endpoints were up, as long as we handle it correctly), and even if part of the issue persists, the logging improvements should help future investigation. And the first bullet point is still a good consistency fix imo.

This PR makes the following changes:
- An agent process for autostarted agents (`use_autostart_agent_map` in the config file) now ignores any agent endpoints in the config file. Instead it runs purely in autostarted agent mode, starting instances for each of the endpoints in the agent map. It then tracks any changes made to the agent map. This last part was already in place, and resulted in a small inconsistency where the process would respect agent names in the config at start, and then suddenly move to consider the agent map the authority as soon as a change comes in. This inconsistency is now resolved by considering the agent map the authority for the entire lifetime of the agent process.
- The autostarted agent manager now trusts in its processes to track the agent map. If a process is already running for an environment, but the agent endpoints are not yet up, it waits for them rather than to kill the process and start fresh.
- When an explicit restart is requested, the autostarted agent manager now expires all sessions for the agents in the agent map, i.e. the endpoints for the killed process.
- Improved robustness of wait condition for an agent to be up, specifically make sure we don't consider expired sessions to be up.
- Made some log messages a bit more informative, no major changes.

Strike through any lines that are not applicable (`~~line~~`) then check the box

- [x] ~Attached issue to pull request~
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [x] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
- [x] ~End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )~
- [x] ~If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~
  • Loading branch information
sanderr authored and arnaudsjs committed May 23, 2024
1 parent 4223f75 commit ba70131
Show file tree
Hide file tree
Showing 6 changed files with 381 additions and 186 deletions.
10 changes: 10 additions & 0 deletions changelogs/unreleased/ensure-agents-improvements.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
description: "Made various improvements to the AutostartedAgent._ensure_agents method"
sections:
bugfix: "Fixed a race condition where autostarted agents might become unresponsive for 30s when restarted"
issue-nr: 7612
change-type: patch
destination-branches:
- master
- iso7
- iso6

29 changes: 19 additions & 10 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,16 +1168,18 @@ async def _init_agent_map(self) -> None:
self.agent_map = cfg.agent_map.get()

async def _init_endpoint_names(self) -> None:
if self.hostname is not None:
await self.add_end_point_name(self.hostname)
else:
# load agent names from the config file
agent_names = cfg.agent_names.get()
if agent_names is not None:
for name in agent_names:
if "$" in name:
name = name.replace("$node-name", self.node_name)
await self.add_end_point_name(name)
assert self.agent_map is not None
endpoints: Iterable[str] = (
[self.hostname]
if self.hostname is not None
else (
self.agent_map.keys()
if cfg.use_autostart_agent_map.get()
else (name if "$" not in name else name.replace("$node-name", self.node_name) for name in cfg.agent_names.get())
)
)
for endpoint in endpoints:
await self.add_end_point_name(endpoint)

async def stop(self) -> None:
await super().stop()
Expand Down Expand Up @@ -1255,6 +1257,13 @@ async def update_agent_map(self, agent_map: dict[str, str]) -> None:
await self._update_agent_map(agent_map)

async def _update_agent_map(self, agent_map: dict[str, str]) -> None:
if "internal" not in agent_map:
LOGGER.warning(
"Agent received an update_agent_map() trigger without internal agent in the agent_map %s",
agent_map,
)
agent_map = {"internal": "local:", **agent_map}

async with self._instances_lock:
self.agent_map = agent_map
# Add missing agents
Expand Down
10 changes: 6 additions & 4 deletions src/inmanta/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import warnings
from abc import ABC, abstractmethod
from collections import abc, defaultdict
from collections.abc import Awaitable, Callable, Iterable, Sequence
from collections.abc import Awaitable, Callable, Iterable, Sequence, Set
from configparser import RawConfigParser
from contextlib import AbstractAsyncContextManager
from itertools import chain
Expand Down Expand Up @@ -1290,7 +1290,7 @@ def get_connection(
"""
if connection is not None:
return util.nullcontext(connection)
# Make pypi happy
# Make mypy happy
assert cls._connection_pool is not None
return cls._connection_pool.acquire()

Expand Down Expand Up @@ -3415,10 +3415,12 @@ def get_valid_field_names(cls) -> list[str]:
return super().get_valid_field_names() + ["process_name", "status"]

@classmethod
async def get_statuses(cls, env_id: uuid.UUID, agent_names: set[str]) -> dict[str, Optional[AgentStatus]]:
async def get_statuses(
cls, env_id: uuid.UUID, agent_names: Set[str], *, connection: Optional[asyncpg.connection.Connection] = None
) -> dict[str, Optional[AgentStatus]]:
result: dict[str, Optional[AgentStatus]] = {}
for agent_name in agent_names:
agent = await cls.get_one(environment=env_id, name=agent_name)
agent = await cls.get_one(environment=env_id, name=agent_name, connection=connection)
if agent:
result[agent_name] = agent.get_status()
else:
Expand Down

0 comments on commit ba70131

Please sign in to comment.