-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Pubsub] Add memory buffer limit in publisher for each subscribed entity #23707
Conversation
src/ray/pubsub/publisher.h
Outdated
class SubscriberState; | ||
|
||
/// State for an entity / topic in a pub/sub channel. | ||
struct EntityState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can convert this struct to a class if anyone prefers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mwtian really sorry for this, but I changed my mind after reviewing this PR again. :( :(
I previously thought the member of this class will be used in other places.
It seems that pending_messages/message_sizes/total_size are actually only used within this structure. Why don't we just make it a class and make them private and move the PublishToEntity
into this one?
cc @scv119 who has comment about PublishToEntity
src/ray/pubsub/publisher.h
Outdated
|
||
/// Publishes the message to subscribers. | ||
/// Returns true if there are subscribers, returns false otherwise. | ||
bool Publish(const rpc::PubMessage &pub_message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the code out of the struct in some way?
src/ray/pubsub/publisher.h
Outdated
|
||
/// Publishes the message to subscribers of the entity. | ||
/// Returns true if there are subscribers, returns false otherwise. | ||
bool PublishToEntity(const rpc::PubMessage &pub_message, EntityState *entity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a member function of EntityState?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should remove this function in .h
file. It seems like a private function for the module.
@scv119 this function was in EntityState before. My concern is that this makes EntityState a state with a lot of public members. Moving it out and making it a helper function makes the struct a data struct only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I was planning to remove this function from the header when addressing the next batch of comments. The style we follow recommends using a class here: https://google.github.io/styleguide/cppguide.html#Structs_vs._Classes. I can make the conversion later.
src/ray/pubsub/publisher.h
Outdated
/// State for an entity / topic in a pub/sub channel. | ||
struct EntityState { | ||
// Tracks inflight messages. | ||
std::queue<std::weak_ptr<rpc::PubMessage>> pending_messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some inline comment about who actually owns the message.
Co-authored-by: Yi Cheng <74173148+iycheng@users.noreply.github.com>
@scv119 do you want to take another look? |
// to implement inflight message tracking across subscribers with non-atomic | ||
// ref-counting or with a LRU-like data structure tracking the range of buffered | ||
// messages for each subscriber. | ||
auto front_msg = pending_messages_.front().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we store weak_ref in pending_messages_, the total_size_ might be over-counting the actual memory usage. is the understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah total_size_
is approximate since we do not count memory used for shared_ptr etc. FWIW, total_size_
is only used after subtracting the sizes of destroyed messages.
Why are these changes needed?
We want to limit the maximum memory used for each subscribed entity, e.g. to avoid having GCS run out of memory if some workers publish a huge amount of logs and subscribers cannot keep up.
After this change, Ray publishers maintain one message buffer for all subscribers of an entity, and one message buffer for all subscribers of all entities in the channel.
The limit can be configured with
publisher_entity_buffer_max_bytes
. The default value is 10MiB.Related issue number
Closes #22339
Checks
scripts/format.sh
to lint the changes in this PR.