Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert revert OBOD pubsub PR #16487

Merged
merged 63 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
ab8b025
Revert "Revert "[Pubsub] Use a pubsub module for Ownership based obje…
rkooo567 Jun 16, 2021
3056474
revert the obod problem.
rkooo567 Jun 16, 2021
e98b97c
Merge branch 'master' into rev-obod
rkooo567 Jun 17, 2021
8d1209f
Add stats.
rkooo567 Jun 17, 2021
e77f94a
Fix a possible regression.
rkooo567 Jun 17, 2021
83a4963
in another progress
rkooo567 Jun 17, 2021
a6222b1
Merge branch 'pubsub-stats' into rev-obod
rkooo567 Jun 17, 2021
9df49eb
debugging
rkooo567 Jun 17, 2021
168fe28
Merge branch 'master' into rev-obod
rkooo567 Jun 18, 2021
3a17076
Fix stats bug
rkooo567 Jun 18, 2021
3f30363
update
rkooo567 Jun 18, 2021
d597862
Add more stats.
rkooo567 Jun 18, 2021
27c9ce0
Add stats
rkooo567 Jun 18, 2021
bbb1a85
lint
rkooo567 Jun 18, 2021
744dccc
Fix issue
rkooo567 Jun 18, 2021
ee49e98
remove spammy logs
rkooo567 Jun 18, 2021
0a308b6
lint
rkooo567 Jun 19, 2021
bcc4c1c
better error msg for debugging
rkooo567 Jun 19, 2021
4c4a8ea
Add even more logging
rkooo567 Jun 19, 2021
43d03d4
Remove spammy logs
rkooo567 Jun 19, 2021
51728ad
Fix iterator invalidation issue
rkooo567 Jun 19, 2021
37c68ac
more debugging info
rkooo567 Jun 19, 2021
e321609
fix
rkooo567 Jun 19, 2021
9b736c1
Add more debug logs
rkooo567 Jun 21, 2021
0111438
Merge branch 'master' into rev-obod
rkooo567 Jun 21, 2021
6aed498
add debug logs
rkooo567 Jun 21, 2021
e06901b
Remove the problematic line for confirmation
rkooo567 Jun 21, 2021
2d1f7e9
Merge branch 'master' into rev-obod
rkooo567 Jun 22, 2021
69e2b93
Completed
rkooo567 Jun 22, 2021
a0e3c20
Fixed a broken test.
rkooo567 Jun 22, 2021
78f837c
experiment
rkooo567 Jun 22, 2021
fef4d65
Lint
rkooo567 Jun 22, 2021
2c200c9
Add a better error message
rkooo567 Jun 23, 2021
dc9cd25
try out
rkooo567 Jun 23, 2021
e853591
Merge branch 'master' into rev-obod
rkooo567 Jun 23, 2021
ed33680
revert the build file.
rkooo567 Jun 23, 2021
6825838
In progress again
rkooo567 Jun 23, 2021
111b850
IP
rkooo567 Jun 24, 2021
376d744
Formatting
rkooo567 Jun 24, 2021
074344a
Revert the log level
rkooo567 Jun 24, 2021
c4e81c2
Merge branch 'master' into rev-obod
rkooo567 Jun 24, 2021
28d3ae8
Unskip test array
rkooo567 Jun 24, 2021
42d04d9
final clean up.
rkooo567 Jun 24, 2021
bbd552b
fix a build issue
rkooo567 Jun 24, 2021
c5e4e55
debug logs
rkooo567 Jun 24, 2021
bcfb268
remove
rkooo567 Jun 24, 2021
4b2dd1c
.
rkooo567 Jun 24, 2021
39c054c
Add more critical logs.
rkooo567 Jun 25, 2021
1f9f436
format
rkooo567 Jun 25, 2021
29ee990
tmp
stephanie-wang Jun 25, 2021
376096c
Merge branch 'rev-obod' of github.com:rkooo567/ray into HEAD
stephanie-wang Jun 25, 2021
33130d9
log
stephanie-wang Jun 25, 2021
385572f
log
stephanie-wang Jun 25, 2021
98be8bb
issue fix
rkooo567 Jun 25, 2021
7667c41
Upgrade
rkooo567 Jun 25, 2021
a0b0228
test experiment
rkooo567 Jun 25, 2021
68bc4ea
Merge branch 'master' into rev-obod
rkooo567 Jun 25, 2021
9306650
Fix an issue
rkooo567 Jun 25, 2021
0aacc43
Fix issues.
rkooo567 Jun 25, 2021
446c82c
Lint
rkooo567 Jun 25, 2021
87a5b8e
Merge branch 'master' into rev-obod
rkooo567 Jun 25, 2021
8d81179
remove unnecessary code
rkooo567 Jun 25, 2021
7aa6009
last clean up.
rkooo567 Jun 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions python/ray/tests/test_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,140 @@ def test_deadlock(get_args, task_args):
print(f"round {i} finished in {time.time() - start}")


def test_object_directory_basic(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource

@ray.remote
def task(x):
pass

# Test a single task.
x_id = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))
ray.get(task.options(resources={str(3): 1}).remote(x_id), timeout=10)

# Test multiple tasks on all nodes can find locations properly.
object_refs = []
for _ in range(num_nodes):
object_refs.append(ray.put(np.zeros(1024 * 1024, dtype=np.uint8)))
ray.get([
task.options(resources={
str(i): 1
}).remote(object_refs[i]) for i in range(num_nodes)
])
del object_refs

@ray.remote
class ObjectHolder:
def __init__(self):
self.x = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))

def get_obj(self):
return self.x

def ready(self):
return True

# Test if tasks can find object location properly
# when there are multiple owners
object_holders = [
ObjectHolder.options(num_cpus=0.01, resources={
str(i): 1
}).remote() for i in range(num_nodes)
]
ray.get([o.ready.remote() for o in object_holders])

object_refs = []
for i in range(num_nodes):
object_refs.append(
object_holders[(i + 1) % num_nodes].get_obj.remote())
ray.get([
task.options(num_cpus=0.01, resources={
str(i): 1
}).remote(object_refs[i]) for i in range(num_nodes)
])

# Test a stressful scenario.
object_refs = []
repeat = 10
for _ in range(num_nodes):
for _ in range(repeat):
object_refs.append(ray.put(np.zeros(1024 * 1024, dtype=np.uint8)))
tasks = []
for i in range(num_nodes):
for r in range(repeat):
tasks.append(
task.options(num_cpus=0.01, resources={
str(i): 0.1
}).remote(object_refs[i * r]))
ray.get(tasks)

object_refs = []
for i in range(num_nodes):
object_refs.append(
object_holders[(i + 1) % num_nodes].get_obj.remote())
tasks = []
for i in range(num_nodes):
for _ in range(10):
tasks.append(
task.options(num_cpus=0.01, resources={
str(i): 0.1
}).remote(object_refs[(i + 1) % num_nodes]))


def test_object_directory_failure(ray_start_cluster):
cluster = ray_start_cluster
config = {
"num_heartbeats_timeout": 10,
"raylet_heartbeat_period_milliseconds": 500,
"object_timeout_milliseconds": 200,
}
# Add a head node.
cluster.add_node(_system_config=config)
ray.init(address=cluster.address)

# Add worker nodes.
num_nodes = 5
for i in range(num_nodes):
cluster.add_node(resources={str(i): 100})

# Add a node to be removed
index_killing_node = num_nodes
node_to_kill = cluster.add_node(
resources={str(index_killing_node): 100}, object_store_memory=10**9)

@ray.remote
class ObjectHolder:
def __init__(self):
self.x = ray.put(np.zeros(1024 * 1024, dtype=np.uint8))

def get_obj(self):
return [self.x]

def ready(self):
return True

oh = ObjectHolder.options(
num_cpus=0.01, resources={
str(index_killing_node): 1
}).remote()
obj = ray.get(oh.get_obj.remote())[0]

@ray.remote
def task(x):
pass

tasks = []
repeat = 3
for i in range(num_nodes):
for _ in range(repeat):
tasks.append(task.options(resources={str(i): 1}).remote(obj))
cluster.remove_node(node_to_kill, allow_graceful=False)

for t in tasks:
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(t, timeout=10)


if __name__ == "__main__":
import pytest
import sys
Expand Down
113 changes: 59 additions & 54 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,16 @@ ray::JobID GetProcessJobID(const ray::CoreWorkerOptions &options) {
// Helper function converts GetObjectLocationsOwnerReply to ObjectLocation
ObjectLocation CreateObjectLocation(const rpc::GetObjectLocationsOwnerReply &reply) {
std::vector<NodeID> node_ids;
node_ids.reserve(reply.node_ids_size());
for (auto i = 0; i < reply.node_ids_size(); i++) {
node_ids.push_back(NodeID::FromBinary(reply.node_ids(i)));
const auto &object_info = reply.object_location_info();
node_ids.reserve(object_info.node_ids_size());
for (auto i = 0; i < object_info.node_ids_size(); i++) {
node_ids.push_back(NodeID::FromBinary(object_info.node_ids(i)));
}
bool is_spilled = !reply.spilled_url().empty();
return ObjectLocation(NodeID::FromBinary(reply.primary_node_id()), reply.object_size(),
std::move(node_ids), is_spilled, reply.spilled_url(),
NodeID::FromBinary(reply.spilled_node_id()));
bool is_spilled = !object_info.spilled_url().empty();
return ObjectLocation(NodeID::FromBinary(object_info.primary_node_id()),
object_info.object_size(), std::move(node_ids), is_spilled,
object_info.spilled_url(),
NodeID::FromBinary(object_info.spilled_node_id()));
}
} // namespace

Expand Down Expand Up @@ -484,12 +486,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
core_worker_client_pool_ =
std::make_shared<rpc::CoreWorkerClientPool>(*client_call_manager_);

object_status_publisher_ = std::make_unique<pubsub::Publisher>(
object_info_publisher_ = std::make_unique<pubsub::Publisher>(
/*periodical_runner=*/&periodical_runner_,
/*get_time_ms=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; },
/*subscriber_timeout_ms=*/RayConfig::instance().subscriber_timeout_ms(),
/*publish_batch_size_=*/RayConfig::instance().publish_batch_size());
object_status_subscriber_ = std::make_unique<pubsub::Subscriber>(
object_info_subscriber_ = std::make_unique<pubsub::Subscriber>(
/*subscriber_id=*/GetWorkerID(),
/*subscriber_address=*/rpc_address_.ip_address(),
/*subscriber_port=*/rpc_address_.port(),
Expand All @@ -499,8 +501,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_

reference_counter_ = std::make_shared<ReferenceCounter>(
rpc_address_,
/*object_status_publisher=*/object_status_publisher_.get(),
/*object_status_subscriber=*/object_status_subscriber_.get(),
/*object_info_publisher=*/object_info_publisher_.get(),
/*object_info_subscriber=*/object_info_subscriber_.get(),
RayConfig::instance().lineage_pinning_enabled(), [this](const rpc::Address &addr) {
return std::shared_ptr<rpc::CoreWorkerClient>(
new rpc::CoreWorkerClient(addr, *client_call_manager_));
Expand Down Expand Up @@ -1383,9 +1385,9 @@ Status CoreWorker::GetLocationFromOwner(
auto owner_address = GetOwnerAddress(object_id);
auto client = core_worker_client_pool_->GetOrConnect(owner_address);
rpc::GetObjectLocationsOwnerRequest request;
request.set_intended_worker_id(owner_address.worker_id());
request.set_object_id(object_id.Binary());
request.set_last_version(-1);
auto object_location_request = request.mutable_object_location_request();
object_location_request->set_intended_worker_id(owner_address.worker_id());
object_location_request->set_object_id(object_id.Binary());
client->GetObjectLocationsOwner(
request,
[object_id, mutex, num_remaining, ready_promise, location_by_id](
Expand Down Expand Up @@ -2480,16 +2482,17 @@ void CoreWorker::ProcessSubscribeForObjectEviction(
pub_message.mutable_worker_object_eviction_message()->set_object_id(
object_id.Binary());

object_status_publisher_->Publish(rpc::ChannelType::WORKER_OBJECT_EVICTION,
pub_message, object_id.Binary());
object_info_publisher_->Publish(rpc::ChannelType::WORKER_OBJECT_EVICTION, pub_message,
object_id.Binary());
};

const auto object_id = ObjectID::FromBinary(message.object_id());
const auto intended_worker_id = WorkerID::FromBinary(message.intended_worker_id());
if (intended_worker_id != worker_context_.GetWorkerID()) {
RAY_LOG(INFO) << "The SubscribeForObjectEviction message is for "
<< intended_worker_id << ", but the current worker id is "
<< worker_context_.GetWorkerID() << ". This will be no-op.";
RAY_LOG(DEBUG) << "The SubscribeForObjectEviction message for object id " << object_id
<< " is for " << intended_worker_id
<< ", but the current worker id is " << worker_context_.GetWorkerID()
<< ".";
unpin_object(object_id);
return;
}
Expand All @@ -2509,12 +2512,14 @@ void CoreWorker::ProcessSubscribeMessage(const rpc::SubMessage &sub_message,
rpc::ChannelType channel_type,
const std::string &key_id,
const NodeID &subscriber_id) {
object_status_publisher_->RegisterSubscription(channel_type, subscriber_id, key_id);
object_info_publisher_->RegisterSubscription(channel_type, subscriber_id, key_id);

if (sub_message.has_worker_object_eviction_message()) {
ProcessSubscribeForObjectEviction(sub_message.worker_object_eviction_message());
} else if (sub_message.has_worker_ref_removed_message()) {
ProcessSubscribeForRefRemoved(sub_message.worker_ref_removed_message());
} else if (sub_message.has_worker_object_locations_message()) {
ProcessSubscribeObjectLocations(sub_message.worker_object_locations_message());
} else {
RAY_LOG(FATAL)
<< "Invalid command has received: "
Expand All @@ -2527,8 +2532,8 @@ void CoreWorker::ProcessPubsubCommands(const Commands &commands,
const NodeID &subscriber_id) {
for (const auto &command : commands) {
if (command.has_unsubscribe_message()) {
object_status_publisher_->UnregisterSubscription(command.channel_type(),
subscriber_id, command.key_id());
object_info_publisher_->UnregisterSubscription(command.channel_type(),
subscriber_id, command.key_id());
} else if (command.has_subscribe_message()) {
ProcessSubscribeMessage(command.subscribe_message(), command.channel_type(),
command.key_id(), subscriber_id);
Expand All @@ -2547,8 +2552,8 @@ void CoreWorker::HandlePubsubLongPolling(const rpc::PubsubLongPollingRequest &re
rpc::SendReplyCallback send_reply_callback) {
const auto subscriber_id = NodeID::FromBinary(request.subscriber_address().raylet_id());
RAY_LOG(DEBUG) << "Got a long polling request from a node " << subscriber_id;
object_status_publisher_->ConnectToSubscriber(subscriber_id, reply,
std::move(send_reply_callback));
object_info_publisher_->ConnectToSubscriber(subscriber_id, reply,
std::move(send_reply_callback));
}

void CoreWorker::HandlePubsubCommandBatch(const rpc::PubsubCommandBatchRequest &request,
Expand Down Expand Up @@ -2594,42 +2599,42 @@ void CoreWorker::HandleRemoveObjectLocationOwner(
send_reply_callback(status, nullptr, nullptr);
}

void CoreWorker::ProcessSubscribeObjectLocations(
const rpc::WorkerObjectLocationsSubMessage &message) {
const auto intended_worker_id = WorkerID::FromBinary(message.intended_worker_id());
const auto object_id = ObjectID::FromBinary(message.object_id());

if (intended_worker_id != worker_context_.GetWorkerID()) {
RAY_LOG(ERROR) << "The ProcessSubscribeObjectLocations message is for "
<< intended_worker_id << ", but the current worker id is "
<< worker_context_.GetWorkerID() << ". This will be no-op.";
object_info_publisher_->PublishFailure(
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, object_id.Binary());
return;
}

// Publish the first object location snapshot when subscribed for the first time.
auto status = reference_counter_->PublishObjectLocationSnapshot(object_id);
if (!status.ok()) {
object_info_publisher_->PublishFailure(
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL, object_id.Binary());
}
}

void CoreWorker::HandleGetObjectLocationsOwner(
const rpc::GetObjectLocationsOwnerRequest &request,
rpc::GetObjectLocationsOwnerReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()),
send_reply_callback)) {
auto &object_location_request = request.object_location_request();
if (HandleWrongRecipient(
WorkerID::FromBinary(object_location_request.intended_worker_id()),
send_reply_callback)) {
return;
}
auto object_id = ObjectID::FromBinary(request.object_id());
const auto &callback = [object_id, reply, send_reply_callback](
const absl::flat_hash_set<NodeID> &locations,
int64_t object_size, const std::string &spilled_url,
const NodeID &spilled_node_id, int64_t current_version,
const absl::optional<NodeID> &optional_primary_node_id) {
auto primary_node_id = optional_primary_node_id.value_or(NodeID::Nil());
RAY_LOG(DEBUG) << "Replying to HandleGetObjectLocationsOwner for " << object_id
<< " with location update version " << current_version << ", "
<< locations.size() << " locations, spilled url: " << spilled_url
<< ", spilled node ID: " << spilled_node_id
<< ", and object size: " << object_size
<< ", and primary node ID: " << primary_node_id;
for (const auto &node_id : locations) {
reply->add_node_ids(node_id.Binary());
}
reply->set_object_size(object_size);
reply->set_spilled_url(spilled_url);
reply->set_spilled_node_id(spilled_node_id.Binary());
reply->set_current_version(current_version);
reply->set_primary_node_id(primary_node_id.Binary());
send_reply_callback(Status::OK(), nullptr, nullptr);
};
auto status = reference_counter_->SubscribeObjectLocations(
object_id, request.last_version(), callback);
if (!status.ok()) {
send_reply_callback(status, nullptr, nullptr);
}
auto object_id = ObjectID::FromBinary(object_location_request.object_id());
auto object_info = reply->mutable_object_location_info();
auto status = reference_counter_->FillObjectInformation(object_id, object_info);
send_reply_callback(status, nullptr, nullptr);
}

void CoreWorker::ProcessSubscribeForRefRemoved(
Expand Down
10 changes: 8 additions & 2 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// stops using the reference, the message will be published to the owner.
void ProcessSubscribeForRefRemoved(const rpc::WorkerRefRemovedSubMessage &message);

/// Process a subscribe message for object locations.
/// Since core worker owns the object directory, there are various raylets
/// that subscribe this object directory.
void ProcessSubscribeObjectLocations(
const rpc::WorkerObjectLocationsSubMessage &message);

using Commands = ::google::protobuf::RepeatedPtrField<rpc::Command>;

/// Process the subscribe message received from the subscriber.
Expand Down Expand Up @@ -1279,10 +1285,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::shared_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;

// A class to publish object status from other raylets/workers.
std::unique_ptr<pubsub::Publisher> object_status_publisher_;
std::unique_ptr<pubsub::Publisher> object_info_publisher_;

// A class to subscribe object status from other raylets/workers.
std::unique_ptr<pubsub::Subscriber> object_status_subscriber_;
std::unique_ptr<pubsub::Subscriber> object_info_subscriber_;

// Interface to submit non-actor tasks directly to leased workers.
std::unique_ptr<CoreWorkerDirectTaskSubmitter> direct_task_submitter_;
Expand Down
Loading