-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dataset] Reduce memory usage during .to_pandas() #20921
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, in the flow from the network through Plasma to building the DataFrame, where was the large memory inflation happening? My understanding of the flow is:
ray.get()
: Plasma buffers are allocated and filled as objects are pulled, with the Arrow tables being deserialized directly into the Plasma buffers.builder.add_block()
: The block builder holds pointers to those Plasma buffers, so no new copies are made while add blocks to the builder.builder.build().to_pandas()
: The.build()
and.to_pandas()
calls create a copy or two during the Arrow table concatenation and the DataFrame construction, respectively.
This PR deals with stages (1) and (2) rather than the copy-heavy stage (3), and AFAICT wouldn't result in a lower peak during those two stages. Am I missing the creation of an extra copy or two in (1) and (2), maybe receiver-side queueing of byte chunks during pulls, or something else?
It's mostly about reducing peak plasma memory usage, which is much more constrained than heap memory usage. Streaming through plasma solves that bottleneck. |
Ah so I think what I was forgetting is that |
Hmm actually is that true? Are serialized Arrow tables currently zero-copy? You've asserted as much elsewhere. #20242 (comment) |
So it looks like all Arrow buffers implement the Pickle5 out-of-band protocol ( output = DelegatingArrowBlockBuilder()
for block in blocks:
output.add_block(ray.get(block)) # <-- Block deserialized with zero-copy, pointers to Plasma buffers
# added to output._tables, very little memory moved to the worker heap.
# <-- All buffers underlying all blocks should still be in Plasma here.
return output.build().to_pandas() # <-- .build() concatenates all tables into worker heap memory,
# releasing Plasma buffers. Am I missing something here? Just making sure that I understand this. |
Yes, this is a good point. I think we'd need to copy in those case to reduce plasma memory usage. But maybe this isn't that important since |
Ok cool, I'll be sure to remember this if we're doing these kind of streaming optimizations on more critical paths. |
Why are these changes needed?
We shouldn't ray.get() all the blocks immediately during the to_pandas call, it's better to do it one by one. That's a little slower but to_pandas() isn't expected to be fast anyways.