Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pubsub] Pubsub module command batch part 1 #16167

Merged
merged 10 commits into from
Jun 9, 2021

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Jun 1, 2021

Why are these changes needed?

Please check this issue for more details; #16048. This implements the pubsub command batch to improve throughput + solve ordering issues. The proposal is at https://docs.google.com/document/d/1OnbmqiXIuyOfJwv_uEShTKrVT9t_vuxNXjX1dc02GW4/edit#heading=h.gi8hbyr0xqg7.

Related issue number

#16048

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

object_status_subscriber_->Subscribe(
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, addr.ToProto(),
object_id.Binary(), message_published_callback, publisher_failed_callback);
std::move(sub_message), rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently is doing nothing. This PR will only implement the command batch in the cpp layer, but the real code actually doesn't use it.

/// Subscribe
///
message Command {
ChannelType channel_type = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably unnecessary as Yi pointed out before. I might remove it in the future PRs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@rkooo567 rkooo567 changed the title [WIP] Pubsub module command batch part 1 [Pubsub] Pubsub module command batch part 1 Jun 1, 2021
Comment on lines 181 to 189
// Update the command in the FIFO order.
int64_t updated_commands = 0;
while (!commands_.empty() && updated_commands < command_max_batch_size_) {
auto &command = commands_.front();
auto *new_command = long_polling_request.add_commands();
new_command->Swap(command.get());
commands_.pop();
updated_commands += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questions here. What do we do for the rest of the commands_? Do you mean that we'll keep calling this function until there is nothing left?

Copy link
Contributor Author

@rkooo567 rkooo567 Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be delivered in the next long polling request. But actually, that's a good point. We should reply right away if there are commands left, so that this will be consumed without waiting. I will fix that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, here's the workflow;

Subscriber batches commands in a long polling "request" -> publisher replies right away although there are no pub messages -> in this way, the subscriber keeps sending long polling "request" until all commands are served.

I can probably add this comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this will be handled in the next PR where we actually migrate all subscribe to command based subscription.

@fishbone
Copy link
Contributor

fishbone commented Jun 2, 2021

Could you add some comment about what's the definition of command here? Is this for sub/unsub only? And are the messages going through a separate path?

@rkooo567
Copy link
Contributor Author

rkooo567 commented Jun 2, 2021

@iycheng Sure. I will update the comment. Here command is from Redis pubsub terminologies.

@fishbone
Copy link
Contributor

fishbone commented Jun 3, 2021

@iycheng Sure. I will update the comment. Here command is from Redis pubsub terminologies.

Got it. Sorry I'm not very familiar with redis pubsub so I think I lost here.

@fishbone fishbone added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jun 3, 2021
@rkooo567
Copy link
Contributor Author

rkooo567 commented Jun 3, 2021

What are terminologies from other systems? Is it just Sub/unsub messages?

@rkooo567 rkooo567 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jun 7, 2021
@rkooo567 rkooo567 requested a review from fishbone June 7, 2021 03:49
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I'd wait for a final pass by @iycheng before merging to confirm that his feedback was fully addressed.

src/ray/common/ray_config_def.h Outdated Show resolved Hide resolved
src/ray/protobuf/pubsub.proto Outdated Show resolved Hide resolved
src/ray/pubsub/subscriber.h Outdated Show resolved Hide resolved
Comment on lines +113 to +115
// ObjectID that contains object_id. This is used when an ObjectID is stored
// inside another object ID that we do not own. Then, we must notify the
// outer ID's owner that the ID contains object_id.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
// ObjectID that contains object_id. This is used when an ObjectID is stored
// inside another object ID that we do not own. Then, we must notify the
// outer ID's owner that the ID contains object_id.
// ID of object that contains `reference`. This is used when an ObjectID is stored
// inside another object that we do not own. Then, we must notify the
// outer object's owner that the object contains `reference`.

// No more request after that.
ASSERT_EQ(owner_client->PopRequest(), nullptr);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests!

@fishbone
Copy link
Contributor

fishbone commented Jun 8, 2021

Feel good about this one. But please update the shared ptr to unique ptr (or you can try &command as well).
I feel (not confident) all the objects from grpc are managed by mem arena inside grpc. So even it's a struct, I feel the data beneath it is still allocated in the pool, so swap still works in this case.

@rkooo567
Copy link
Contributor Author

rkooo567 commented Jun 9, 2021

local_object_manager_test is flaky in the master and the first mac build occasionally failed in Travis. This PR doesn't effectively change any external behavior, so it should be safe. Merging it.

@rkooo567 rkooo567 merged commit d9227d8 into ray-project:master Jun 9, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants