-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Abstract Airlift Discovery from InternalNodeManager #26083
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Thanks @dain for doing that! |
ff703ad
to
2f93c9d
Compare
The testing HTTP endpoint was invalid so state was always active
When new nodes are discovered the initial state is set to ACTIVE, but a node must be probed to get the initial state. It appears this bug was introduced when worker reactivation was added. Additional, nodes without the correct version are not tracked at all
Create InternalNode directly for current node Add bound CurrentNodeState which is a supplier of NodeState for the current node. This is requires changes to NodeStateManager to avoid circular Guice dependencies.
Instead of relying on discovery, node information can be pulled from the node when state is being fetched.
2f93c9d
to
5e2de86
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR abstracts Airlift-based discovery behind a new NodeInventory
API, migrating node-related types from the io.trino.metadata
package into io.trino.node
, and updating consumers to use the new interfaces. It also refactors NodeStateManager
to use the new CurrentNodeState
supplier and enhances tests to wait for asynchronous node updates.
- Introduce
io.trino.node
package withNodeInventory
,InternalNode
,NodeState
,DiscoveryNodeManager
, and related classes. - Refactor
NodeStateManager
to delegate state/version tracking toCurrentNodeState
. - Update imports across the codebase and add polling loops in tests to stabilize asynchronous node visibility.
Reviewed Changes
Copilot reviewed 108 out of 108 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
testing/trino-tests/src/test/java/io/trino/tests/TestMinWorkerRequirement.java | Add @Timeout and polling loop to wait for node drop before assert |
testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java | Replace immediate verify with a polling loop and timeout |
core/trino-main/src/main/java/io/trino/node/NodeInventory.java | New interface to abstract node discovery |
core/trino-main/src/main/java/io/trino/node/DiscoveryNodeManager.java | New discovery manager using NodeInventory and HTTP polling |
core/trino-main/src/main/java/io/trino/server/NodeStateManager.java | Refactor to remove internal VersionedState and use CurrentNodeState |
Comments suppressed due to low confidence (2)
core/trino-main/src/main/java/io/trino/server/NodeStateManager.java:316
- [nitpick] This nested
VersionedState
class duplicates logic also found in the outerNodeStateManager
refactor. Consider extracting it into a shared static class or moving it intoCurrentNodeState
as a static nested class to reduce duplication and improve clarity.
public static class CurrentNodeState
core/trino-main/src/main/java/io/trino/node/AirliftNodeInventory.java:49
- [nitpick] Consider adding unit tests for
AirliftNodeInventory.getNodes()
to verify that it correctly filters out failed services and extracts URIs based on thehttpsRequired
flag.
public Set<URI> getNodes()
// If the result is null, mark the node as INACTIVE to prevent work from being scheduled on it | ||
NodeState nodeState; | ||
if (result == null || !result.hasValue()) { | ||
nodeState = ACTIVE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above indicates marking the node as INACTIVE when the result is null, but the code sets it to ACTIVE. It should set nodeState = INACTIVE
to match the intended logic.
nodeState = ACTIVE; | |
nodeState = INACTIVE; |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it is right :)
@@ -121,7 +123,10 @@ public void testInsufficientWorkerNodesAfterDrop() | |||
assertThat(queryRunner.getCoordinator().refreshNodes().getActiveNodes()).hasSize(4); | |||
|
|||
queryRunner.getServers().get(0).close(); | |||
assertThat(queryRunner.getCoordinator().refreshNodes().getActiveNodes()).hasSize(3); | |||
while (queryRunner.getCoordinator().refreshNodes().getActiveNodes().size() > 3) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Instead of a manual polling loop, consider using a testing utility (e.g., Awaitility) to await the expected active node count. This can make the test clearer and reduce boilerplate.
Copilot uses AI. Check for mistakes.
This PR encapsulates Airlfit discovery behind a new NodeInventory API. This is pre-work to replace discovery with a simpler system and to support K8s discovery directly.
The only significant change is that the information about nodes is now loaded directly from the node itself instead of relying on discovery announcements. This additional information is fetched from the node when the node state is being fetched, so it does not place any new additional burden on the nodes.
Release notes
(X) This is not user-visible or is docs only, and no release notes are required.