-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE][VL] Add option to limit the memory Gluten can use for each task to N = (memory / task slots) #3101
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/oap-project/gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
Run Gluten Clickhouse CI |
// A decorator to a task memory target, to restrict memory usage of the delegated | ||
// memory target to X, X = free executor memory / task slots. | ||
// Using this to prevent OOMs if the delegated memory target could possibly | ||
// hold large memory blocks that are not spillable. | ||
// See https://github.com/oap-project/gluten/issues/3030 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How to restrict specific memory consumer's usage to X? return zero when acquireMemory or just OOM?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More designs needed here. But overall the consumer with the decorator should behave like a consumer registered to a task memory manager with a fixed limit (which is X).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, hope following design could solve my question, when a consumer hit limit, what behavior is expected.
import org.apache.spark.memory.TaskMemoryManager; | ||
|
||
// A decorator to a task memory target, to restrict memory usage of the delegated | ||
// memory target to X, X = free executor memory / task slots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does task slot
equal to the configured CPU cores of current Spark executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. There is a calculation
def getTaskSlots(conf: SparkConf): Int = {
val executorCores = SparkResourceUtil.getExecutorCores(conf)
val taskCores = conf.getInt("spark.task.cpus", 1)
executorCores / taskCores
}
|
||
private final TaskMemoryTarget delegated; | ||
|
||
public IsolatedByTaskSlot(TaskMemoryTarget delegated) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't find the usage of this class. Inference from the name, does that mean we want to introduce a concept of a memory pool at each task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR was not ready at that time, The class was just a placeholder for my initial thoughts of the design.
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
… = (memory / task slots)
Run Gluten Clickhouse CI |
b360bcc
to
37e8d61
Compare
Run Gluten Clickhouse CI |
1 similar comment
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
1ed5c28
to
0163f84
Compare
Run Gluten Clickhouse CI |
while (q.peek() != null && remainingBytes > 0) { | ||
TreeMemoryConsumerNode head = q.remove(); | ||
long spilled = spillTree(head, remainingBytes); | ||
remainingBytes -= spilled; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic wants invoke spill from smallest consumer to largest consumer right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's from largest to smallest.
In future we may want to follow vanilla Spark's rule, which uses the smallest one among those are larger than the target size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not fully understanding...
We sort children in descending order and invoke spillTree on peek element's children recursively, which means we pick largest consumer and then pass smaller consumer into spillTree, when node has no children, we spill it and return.
I think spillTree loop from largest to smallest, but spill from smallest to largest, please correct me if I'm wrong, thanks!
Another misunderstanding logic, does the sort of root not sort its all children? seems we sort node's children in every spillTree invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, thanks for helping checking this code. I haven't verified it carefully but let's take a simple example:
a 200 (self 80)
|-b 70
|-c 50 (self 10)
|- d 30
|- e 10
With the code we implemented post-order traversal on this tree (children-first, self-last), which means the visiting order is supposed to be
b (70) -> d (30) -> e (10) -> c (10) -> a (80)
Which seems to be aligned with my initial assumption: largest to smallest, but self last.
Is that the same with your thoughts?
Another misunderstanding logic, does the sort of root not sort its all children? seems we sort node's children in every spillTree invoked.
Yes the data structure is not efficient since we only sorted the children of the current node (See code q.addAll(node.children().values());
). So probably we can move to TreeMap/TreeSet later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which seems to be aligned with my initial assumption: largest to smallest, but self last.
Is that the same with your thoughts?
Thanks, it's same.
def conservativeOffHeapMemorySize: Long = | ||
conf.getConf(COLUMNAR_CONSERVATIVE_OFFHEAP_SIZE_IN_BYTES) | ||
|
||
def conservativeTaskOffHeapMemorySize: Long = | ||
conf.getConf(COLUMNAR_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions here:
- what's the difference for these two configs?
- what does
conservative
mean? - seems
conservativeOffHeapMemorySize
was not used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Conservative" means the max size Gluten can consider is "safe" to use. The two new options are set in GlutenPlugin.scala
using the following code:
// Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory
// determined by spark.memory.storageFraction was used.
val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d)
val conservativeOffHeapSize = (offHeapSize
* fraction).toLong
conf.set(
GlutenConfig.GLUTEN_CONSERVATIVE_OFFHEAP_SIZE_IN_BYTES_KEY,
conservativeOffHeapSize.toString)
val conservativeOffHeapPerTask = conservativeOffHeapSize / taskSlots
conf.set(
GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
conservativeOffHeapPerTask.toString)
The difference between the options and "non-conservative" options is that the "conservative" ones take storage memory into account. Assuming Spark had used 30% off-heap memory in storage memory pool, the memory would not be evicted although a "borrow" is request from execution memory pool.
I think for stability, we may need to by default use "conservative" options since it's safter in most cases. I left the "non-conservative" options unchanged because of compatibility consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides, I don't want users to set these "auto-generated" options. But we don't develop a general way to guard against that yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems conservativeOffHeapMemorySize was not used?
Yes. But it's worth to keep it for future use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for detailed explanation! I have realized the "conservative" meaning.
we may need to by default use "conservative" options since it's safter in most cases.
Please keep a switch config for that, Spark already has maybeGrowExecutionPool
to shrink storagePool and executionPool, I prefer let Spark control this logic. If use "conservative" option by default, we may not fully utilize the unused storage memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to use "conservative" options for just Gluten's related codes that requires reading the off-heap size. For example shuffle writer and partial agg. It's not a goal to touch vanilla Spark's memory management.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth noting that Spark's "storage region size" determined by spark.memory.storageFraction
is not evictable if in use. That's why we add the conservative options.
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
Run Gluten Clickhouse CI |
/Benchmark Velox |
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
a170507
to
654a774
Compare
Run Gluten Clickhouse CI |
654a774
to
a170507
Compare
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Another big change on memory component!
===== Performance report for TPCH SF2000 with Velox backend, for reference only ====
|
Add new option
spark.gluten.memory.isolation
(by defaultfalse
):Description of the option:
The implementation inserts a complete memory management layer
TreeMemoryConsumer
between Spark's memory manager and Gluten. Once the task memory limit is hit,TreeMemoryConsumer
will first try calling child spillers inside it's own scope without notifying Spark. After freeing some spaces,TreeMemoryConsumer
continues to acquire memory from Spark.User is supposed to use the feature to get rid of OOMs caused by pinned non-spillable Velox memory (#3030) used by a historical task that is commanded to spill due to the coming of the other new tasks. This typically happens when the session is shared by a couple of concurrent queries.