New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix scheduling for splits with locality requirement in Tardigrade #11581
Conversation
3b17f93
to
473aea1
Compare
...ino-main/src/main/java/io/trino/execution/scheduler/FullNodeCapableNodeAllocatorService.java
Show resolved
Hide resolved
plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsNodePartitioningProvider.java
Outdated
Show resolved
Hide resolved
plugin/trino-tpcds/src/main/java/io/trino/plugin/tpcds/TpcdsNodePartitioningProvider.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java
Outdated
Show resolved
Hide resolved
InternalNode node = bucketNodeMap.getAssignedNode(bucket) | ||
.orElseThrow(() -> new IllegalStateException("Nodes are expected to be assigned for non dynamic BucketNodeMap")); | ||
Integer partitionId = nodeToPartition.get(node); | ||
if (partitionId == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would that make sense to have more than one partition on single node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting question. For example to make tasks smaller for more granular retries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah for example. For partitions, we should probably opt for those to be similarly sized. Building partitions on top of the enforced bucket<->node mapping does not necessarily imply that. Not something that we need to address here. Just a random thought.
HostAddress existingValue = partitionToNodeMap.put(partition, bucketNodeMap.getAssignedNode(split).get().getHostAndPort()); | ||
checkState(existingValue == null, "host already assigned for partition %s: %s", partition, existingValue); | ||
HostAddress requiredAddress = bucketNodeMap.getAssignedNode(split).get().getHostAndPort(); | ||
Set<HostAddress> existingRequirement = partitionToNodeMap.get(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems existingRequirement
will have at most one element. Then we don't really need a set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A split has a list of hosts in it's requirement. The set is needed to support split specific requirements.
} | ||
} | ||
else { | ||
BiMap<InternalNode, Integer> nodeToPartition = HashBiMap.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it' beneficial to add a comment explaining the logic, e.g. "make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling".
Otherwise queries like "SHOW TABLES" won't work
Make it consistent with locality requirements defined in splits
To allow scheduling of coordinator only tasks and splits
473aea1
to
c587f05
Compare
Description
Fixes several problems related to scheduling of non remotely accessible splits
Fix
Core engine (Tardigrade)
Fixes scheduling for non remotely accessible splits in certain corner cases. Prior this fix in some queries scanning over non remotely accessible splits might've been failing.
Related issues, pull requests, and links
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
() No release notes entries required.
(x) Release notes entries required with the following suggested text: